You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/04/19 13:09:23 UTC
[14/18] beam git commit: [BEAM-1994] Remove Flink examples package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
deleted file mode 100644
index 8f50105..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.util.UserCodeException;
-
-/**
- * Test Flink runner.
- */
-public class TestFlinkRunner extends PipelineRunner<PipelineResult> {
-
- private FlinkRunner delegate;
-
- private TestFlinkRunner(FlinkPipelineOptions options) {
- // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment
- options.setFlinkMaster("[auto]");
- this.delegate = FlinkRunner.fromOptions(options);
- }
-
- public static TestFlinkRunner fromOptions(PipelineOptions options) {
- FlinkPipelineOptions flinkOptions =
- PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options);
- return new TestFlinkRunner(flinkOptions);
- }
-
- public static TestFlinkRunner create(boolean streaming) {
- FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
- flinkOptions.setRunner(TestFlinkRunner.class);
- flinkOptions.setStreaming(streaming);
- return TestFlinkRunner.fromOptions(flinkOptions);
- }
-
- @Override
- public PipelineResult run(Pipeline pipeline) {
- try {
- return delegate.run(pipeline);
- } catch (Throwable t) {
- // Special case hack to pull out assertion errors from PAssert; instead there should
- // probably be a better story along the lines of UserCodeException.
- UserCodeException innermostUserCodeException = null;
- Throwable current = t;
- for (; current.getCause() != null; current = current.getCause()) {
- if (current instanceof UserCodeException) {
- innermostUserCodeException = ((UserCodeException) current);
- }
- }
- if (innermostUserCodeException != null) {
- current = innermostUserCodeException.getCause();
- }
- if (current instanceof AssertionError) {
- throw (AssertionError) current;
- }
- throw new PipelineExecutionException(current);
- }
- }
-
- public PipelineOptions getPipelineOptions() {
- return delegate.getPipelineOptions();
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
deleted file mode 100644
index ad54750..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink;
-
-/**
- * The translation mode of the Beam Pipeline.
- */
-enum TranslationMode {
-
- /** Uses the batch mode of Flink. */
- BATCH,
-
- /** Uses the streaming mode of Flink. */
- STREAMING
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
deleted file mode 100644
index 57f1e59..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
deleted file mode 100644
index fb2493b..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link AggregatorFactory} for the Flink Batch Runner.
- */
-public class FlinkAggregatorFactory implements AggregatorFactory{
-
- private final RuntimeContext runtimeContext;
-
- public FlinkAggregatorFactory(RuntimeContext runtimeContext) {
- this.runtimeContext = runtimeContext;
- }
-
- @Override
- public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn(
- Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName,
- Combine.CombineFn<InputT, AccumT, OutputT> combine) {
- @SuppressWarnings("unchecked")
- SerializableFnAggregatorWrapper<InputT, OutputT> result =
- (SerializableFnAggregatorWrapper<InputT, OutputT>)
- runtimeContext.getAccumulator(aggregatorName);
-
- if (result == null) {
- result = new SerializableFnAggregatorWrapper<>(combine);
- runtimeContext.addAccumulator(aggregatorName, result);
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
deleted file mode 100644
index 447b1e5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.Iterables;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for
- * Flink functions.
- */
-class FlinkAssignContext<InputT, W extends BoundedWindow>
- extends WindowFn<InputT, W>.AssignContext {
- private final WindowedValue<InputT> value;
-
- FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
- fn.super();
- checkArgument(
- Iterables.size(value.getWindows()) == 1,
- String.format(
- "%s passed to window assignment must be in a single window, but it was in %s: %s",
- WindowedValue.class.getSimpleName(),
- Iterables.size(value.getWindows()),
- value.getWindows()));
- this.value = value;
- }
-
- @Override
- public InputT element() {
- return value.getValue();
- }
-
- @Override
- public Instant timestamp() {
- return value.getTimestamp();
- }
-
- @Override
- public BoundedWindow window() {
- return Iterables.getOnlyElement(value.getWindows());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
deleted file mode 100644
index c3a5095..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collection;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * Flink {@link FlatMapFunction} for implementing
- * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}.
- */
-public class FlinkAssignWindows<T, W extends BoundedWindow>
- implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> {
-
- private final WindowFn<T, W> windowFn;
-
- public FlinkAssignWindows(WindowFn<T, W> windowFn) {
- this.windowFn = windowFn;
- }
-
- @Override
- public void flatMap(
- WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception {
- Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input));
- for (W window: windows) {
- collector.collect(
- WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
deleted file mode 100644
index 51582af..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.Collections;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RichMapPartitionFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-
-/**
- * Encapsulates a {@link DoFn}
- * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}.
- *
- * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index
- * and must tag all outputs with the output number. Afterwards a filter will filter out
- * those elements that are not to be in a specific output.
- */
-public class FlinkDoFnFunction<InputT, OutputT>
- extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> {
-
- private final SerializedPipelineOptions serializedOptions;
-
- private final DoFn<InputT, OutputT> doFn;
- private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- private final Map<TupleTag<?>, Integer> outputMap;
- private final TupleTag<OutputT> mainOutputTag;
-
- private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
-
- public FlinkDoFnFunction(
- DoFn<InputT, OutputT> doFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions options,
- Map<TupleTag<?>, Integer> outputMap,
- TupleTag<OutputT> mainOutputTag) {
-
- this.doFn = doFn;
- this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(options);
- this.windowingStrategy = windowingStrategy;
- this.outputMap = outputMap;
- this.mainOutputTag = mainOutputTag;
-
- }
-
- @Override
- public void mapPartition(
- Iterable<WindowedValue<InputT>> values,
- Collector<WindowedValue<OutputT>> out) throws Exception {
-
- RuntimeContext runtimeContext = getRuntimeContext();
-
- DoFnRunners.OutputManager outputManager;
- if (outputMap == null) {
- outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
- } else {
- // it has some additional outputs
- outputManager =
- new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
- }
-
- DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(), doFn,
- new FlinkSideInputReader(sideInputs, runtimeContext),
- outputManager,
- mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of additional outputs
- Collections.<TupleTag<?>>emptyList(),
- new FlinkNoOpStepContext(),
- new FlinkAggregatorFactory(runtimeContext),
- windowingStrategy);
-
- doFnRunner.startBundle();
-
- for (WindowedValue<InputT> value : values) {
- doFnRunner.processElement(value);
- }
-
- doFnRunner.finishBundle();
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- doFnInvoker = DoFnInvokers.invokerFor(doFn);
- doFnInvoker.invokeSetup();
- }
-
- @Override
- public void close() throws Exception {
- doFnInvoker.invokeTeardown();
- }
-
- static class DoFnOutputManager
- implements DoFnRunners.OutputManager {
-
- private Collector collector;
-
- DoFnOutputManager(Collector collector) {
- this.collector = collector;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- collector.collect(output);
- }
- }
-
- static class MultiDoFnOutputManager
- implements DoFnRunners.OutputManager {
-
- private Collector<WindowedValue<RawUnionValue>> collector;
- private Map<TupleTag<?>, Integer> outputMap;
-
- MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector,
- Map<TupleTag<?>, Integer> outputMap) {
- this.collector = collector;
- this.outputMap = outputMap;
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()),
- output.getTimestamp(), output.getWindows(), output.getPane()));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
deleted file mode 100644
index 26fd0b4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- *
- * <p>This is different from the pair of function for the non-merging windows case
- * in that we cannot do combining before the shuffle because elements would not
- * yet be in their correct windows for side-input access.
- */
-public class FlinkMergingNonShuffleReduceFunction<
- K, InputT, AccumT, OutputT, W extends IntervalWindow>
- extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> {
-
- private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn;
-
- private final WindowingStrategy<?, W> windowingStrategy;
-
- private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
- private final SerializedPipelineOptions serializedOptions;
-
- public FlinkMergingNonShuffleReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
- WindowingStrategy<?, W> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions) {
-
- this.combineFn = keyedCombineFn;
-
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
-
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
- }
-
- @Override
- public void reduce(
- Iterable<WindowedValue<KV<K, InputT>>> elements,
- Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
- PipelineOptions options = serializedOptions.getPipelineOptions();
-
- FlinkSideInputReader sideInputReader =
- new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
- PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
- // get all elements so that we can sort them, has to fit into
- // memory
- // this seems very unprudent, but correct, for now
- List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
- for (WindowedValue<KV<K, InputT>> inputValue : elements) {
- for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
- sortedInput.add(exploded);
- }
- }
- Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
- @Override
- public int compare(
- WindowedValue<KV<K, InputT>> o1,
- WindowedValue<KV<K, InputT>> o2) {
- return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
- .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
- }
- });
-
- // merge windows, we have to do it in an extra pre-processing step and
- // can't do it as we go since the window of early elements would not
- // be correct when calling the CombineFn
- mergeWindow(sortedInput);
-
- // iterate over the elements that are sorted by window timestamp
- final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
- // create accumulator using the first elements key
- WindowedValue<KV<K, InputT>> currentValue = iterator.next();
- K key = currentValue.getValue().getKey();
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
- InputT firstValue = currentValue.getValue().getValue();
- AccumT accumulator =
- combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
- options, sideInputReader, currentValue.getWindows());
-
- // we use this to keep track of the timestamps assigned by the OutputTimeFn
- Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
- while (iterator.hasNext()) {
- WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- IntervalWindow nextWindow =
- (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
- if (currentWindow.equals(nextWindow)) {
- // continue accumulating and merge windows
-
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
-
- windowTimestamp = outputTimeFn.combine(
- windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
- } else {
- // emit the value that we currently have
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
-
- currentWindow = nextWindow;
- currentValue = nextValue;
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.createAccumulator(key,
- options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
- }
-
- }
-
- // emit the final accumulator
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
- }
-
- /**
- * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
- * This replaces windows in the input list.
- */
- private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
- int currentStart = 0;
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
- for (int i = 1; i < elements.size(); i++) {
- WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
- IntervalWindow nextWindow =
- (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
- if (currentWindow.intersects(nextWindow)) {
- // we continue
- currentWindow = currentWindow.span(nextWindow);
- } else {
- // retrofit the merged window to all windows up to "currentStart"
- for (int j = i - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, InputT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- currentStart = i;
- currentWindow = nextWindow;
- }
- }
- if (currentStart < elements.size() - 1) {
- // we have to retrofit the last batch
- for (int j = elements.size() - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, InputT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
deleted file mode 100644
index c68f155..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow>
- extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
-
- public FlinkMergingPartialReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
- WindowingStrategy<?, W> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions) {
- super(combineFn, windowingStrategy, sideInputs, pipelineOptions);
- }
-
- @Override
- public void combine(
- Iterable<WindowedValue<KV<K, InputT>>> elements,
- Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
-
- PipelineOptions options = serializedOptions.getPipelineOptions();
-
- FlinkSideInputReader sideInputReader =
- new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
- PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
- // get all elements so that we can sort them, has to fit into
- // memory
- // this seems very unprudent, but correct, for now
- List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
- for (WindowedValue<KV<K, InputT>> inputValue : elements) {
- for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
- sortedInput.add(exploded);
- }
- }
- Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
- @Override
- public int compare(
- WindowedValue<KV<K, InputT>> o1,
- WindowedValue<KV<K, InputT>> o2) {
- return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
- .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
- }
- });
-
- // merge windows, we have to do it in an extra pre-processing step and
- // can't do it as we go since the window of early elements would not
- // be correct when calling the CombineFn
- mergeWindow(sortedInput);
-
- // iterate over the elements that are sorted by window timestamp
- final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
- // create accumulator using the first elements key
- WindowedValue<KV<K, InputT>> currentValue = iterator.next();
- K key = currentValue.getValue().getKey();
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
- InputT firstValue = currentValue.getValue().getValue();
- AccumT accumulator = combineFnRunner.createAccumulator(key,
- options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
- options, sideInputReader, currentValue.getWindows());
-
- // we use this to keep track of the timestamps assigned by the OutputTimeFn
- Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
- while (iterator.hasNext()) {
- WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
- if (currentWindow.equals(nextWindow)) {
- // continue accumulating and merge windows
-
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
-
- windowTimestamp = outputTimeFn.combine(
- windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
- } else {
- // emit the value that we currently have
- out.collect(
- WindowedValue.of(
- KV.of(key, accumulator),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
-
- currentWindow = nextWindow;
- currentValue = nextValue;
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.createAccumulator(key,
- options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
- }
- }
-
- // emit the final accumulator
- out.collect(
- WindowedValue.of(
- KV.of(key, accumulator),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
- }
-
- /**
- * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
- * This replaces windows in the input list.
- */
- private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
- int currentStart = 0;
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
- for (int i = 1; i < elements.size(); i++) {
- WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
- IntervalWindow nextWindow =
- (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
- if (currentWindow.intersects(nextWindow)) {
- // we continue
- currentWindow = currentWindow.span(nextWindow);
- } else {
- // retrofit the merged window to all windows up to "currentStart"
- for (int j = i - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, InputT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- currentStart = i;
- currentWindow = nextWindow;
- }
- }
- if (currentStart < elements.size() - 1) {
- // we have to retrofit the last batch
- for (int j = elements.size() - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, InputT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
deleted file mode 100644
index 84b3adc..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * Special version of {@link FlinkReduceFunction} that supports merging windows. This
- * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the
- * same behaviour as {@code MergeOverlappingIntervalWindows}.
- */
-public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow>
- extends FlinkReduceFunction<K, AccumT, OutputT, W> {
-
- public FlinkMergingReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
- WindowingStrategy<?, W> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions) {
- super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions);
- }
-
- @Override
- public void reduce(
- Iterable<WindowedValue<KV<K, AccumT>>> elements,
- Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
- PipelineOptions options = serializedOptions.getPipelineOptions();
-
- FlinkSideInputReader sideInputReader =
- new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
- PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
- // get all elements so that we can sort them, has to fit into
- // memory
- // this seems very unprudent, but correct, for now
- ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
- for (WindowedValue<KV<K, AccumT>> inputValue : elements) {
- for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) {
- sortedInput.add(exploded);
- }
- }
- Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
- @Override
- public int compare(
- WindowedValue<KV<K, AccumT>> o1,
- WindowedValue<KV<K, AccumT>> o2) {
- return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
- .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
- }
- });
-
- // merge windows, we have to do it in an extra pre-processing step and
- // can't do it as we go since the window of early elements would not
- // be correct when calling the CombineFn
- mergeWindow(sortedInput);
-
- // iterate over the elements that are sorted by window timestamp
- final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
-
- // get the first accumulator
- WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
- K key = currentValue.getValue().getKey();
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
- AccumT accumulator = currentValue.getValue().getValue();
-
- // we use this to keep track of the timestamps assigned by the OutputTimeFn,
- // in FlinkPartialReduceFunction we already merge the timestamps assigned
- // to individual elements, here we just merge them
- List<Instant> windowTimestamps = new ArrayList<>();
- windowTimestamps.add(currentValue.getTimestamp());
-
- while (iterator.hasNext()) {
- WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
- IntervalWindow nextWindow =
- (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
-
- if (nextWindow.equals(currentWindow)) {
- // continue accumulating and merge windows
-
- accumulator = combineFnRunner.mergeAccumulators(
- key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
- options, sideInputReader, currentValue.getWindows());
-
- windowTimestamps.add(nextValue.getTimestamp());
- } else {
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- outputTimeFn.merge(currentWindow, windowTimestamps),
- currentWindow,
- PaneInfo.NO_FIRING));
-
- windowTimestamps.clear();
-
- currentWindow = nextWindow;
- currentValue = nextValue;
- accumulator = nextValue.getValue().getValue();
- windowTimestamps.add(nextValue.getTimestamp());
- }
- }
-
- // emit the final accumulator
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- outputTimeFn.merge(currentWindow, windowTimestamps),
- currentWindow,
- PaneInfo.NO_FIRING));
- }
-
- /**
- * Merge windows. This assumes that the list of elements is sorted by window-end timestamp.
- * This replaces windows in the input list.
- */
- private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) {
- int currentStart = 0;
- IntervalWindow currentWindow =
- (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows());
-
- for (int i = 1; i < elements.size(); i++) {
- WindowedValue<KV<K, AccumT>> nextValue = elements.get(i);
- IntervalWindow nextWindow =
- (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
- if (currentWindow.intersects(nextWindow)) {
- // we continue
- currentWindow = currentWindow.span(nextWindow);
- } else {
- // retrofit the merged window to all windows up to "currentStart"
- for (int j = i - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, AccumT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- currentStart = i;
- currentWindow = nextWindow;
- }
- }
- if (currentStart < elements.size() - 1) {
- // we have to retrofit the last batch
- for (int j = elements.size() - 1; j >= currentStart; j--) {
- WindowedValue<KV<K, AccumT>> value = elements.get(j);
- elements.set(
- j,
- WindowedValue.of(
- value.getValue(), value.getTimestamp(), currentWindow, value.getPane()));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
deleted file mode 100644
index 9071cc5..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.util.Collector;
-
-/**
- * A {@link FlatMapFunction} function that filters out those elements that don't belong in this
- * output. We need this to implement MultiOutput ParDo functions in combination with
- * {@link FlinkDoFnFunction}.
- */
-public class FlinkMultiOutputPruningFunction<T>
- implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> {
-
- private final int ourOutputTag;
-
- public FlinkMultiOutputPruningFunction(int ourOutputTag) {
- this.ourOutputTag = ourOutputTag;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void flatMap(
- WindowedValue<RawUnionValue> windowedValue,
- Collector<WindowedValue<T>> collector) throws Exception {
- int unionTag = windowedValue.getValue().getUnionTag();
- if (unionTag == ourOutputTag) {
- collector.collect(
- (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue()));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
deleted file mode 100644
index 847a00a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.io.IOException;
-import org.apache.beam.runners.core.ExecutionContext.StepContext;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * A {@link StepContext} for Flink Batch Runner execution.
- */
-public class FlinkNoOpStepContext implements StepContext {
-
- @Override
- public String getStepName() {
- return null;
- }
-
- @Override
- public String getTransformName() {
- return null;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {
-
- }
-
- @Override
- public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(
- TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder,
- W window,
- Coder<W> windowCoder) throws IOException {
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return null;
- }
-
- @Override
- public TimerInternals timerInternals() {
- return null;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
deleted file mode 100644
index 1d1ff9f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupCombineFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
- * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local
- * combine step before shuffling while the latter does the final combination after a shuffle.
- *
- * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but
- * for different windows. We have to ensure that we only combine elements of matching
- * windows.
- */
-public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow>
- extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> {
-
- protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn;
-
- protected final WindowingStrategy<?, W> windowingStrategy;
-
- protected final SerializedPipelineOptions serializedOptions;
-
- protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
- public FlinkPartialReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
- WindowingStrategy<?, W> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions) {
-
- this.combineFn = combineFn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
- }
-
- @Override
- public void combine(
- Iterable<WindowedValue<KV<K, InputT>>> elements,
- Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
-
- PipelineOptions options = serializedOptions.getPipelineOptions();
-
- FlinkSideInputReader sideInputReader =
- new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
- PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
- // get all elements so that we can sort them, has to fit into
- // memory
- // this seems very unprudent, but correct, for now
- ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
- for (WindowedValue<KV<K, InputT>> inputValue : elements) {
- for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) {
- sortedInput.add(exploded);
- }
- }
- Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() {
- @Override
- public int compare(
- WindowedValue<KV<K, InputT>> o1,
- WindowedValue<KV<K, InputT>> o2) {
- return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
- .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
- }
- });
-
- // iterate over the elements that are sorted by window timestamp
- //
- final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator();
-
- // create accumulator using the first elements key
- WindowedValue<KV<K, InputT>> currentValue = iterator.next();
- K key = currentValue.getValue().getKey();
- BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
- InputT firstValue = currentValue.getValue().getValue();
- AccumT accumulator = combineFnRunner.createAccumulator(key,
- options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, firstValue,
- options, sideInputReader, currentValue.getWindows());
-
- // we use this to keep track of the timestamps assigned by the OutputTimeFn
- Instant windowTimestamp =
- outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow);
-
- while (iterator.hasNext()) {
- WindowedValue<KV<K, InputT>> nextValue = iterator.next();
- BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
-
- if (nextWindow.equals(currentWindow)) {
- // continue accumulating
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
-
- windowTimestamp = outputTimeFn.combine(
- windowTimestamp,
- outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow));
-
- } else {
- // emit the value that we currently have
- out.collect(
- WindowedValue.of(
- KV.of(key, accumulator),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
-
- currentWindow = nextWindow;
- currentValue = nextValue;
- InputT value = nextValue.getValue().getValue();
- accumulator = combineFnRunner.createAccumulator(key,
- options, sideInputReader, currentValue.getWindows());
- accumulator = combineFnRunner.addInput(key, accumulator, value,
- options, sideInputReader, currentValue.getWindows());
- windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
- }
- }
-
- // emit the final accumulator
- out.collect(
- WindowedValue.of(
- KV.of(key, accumulator),
- windowTimestamp,
- currentWindow,
- PaneInfo.NO_FIRING));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
deleted file mode 100644
index 3e4f742..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.PerKeyCombineFnRunner;
-import org.apache.beam.runners.core.PerKeyCombineFnRunners;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.CombineFnBase;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey}
- * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final
- * combination of the pre-combined values after a shuffle.
- *
- * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but
- * for different windows. We have to ensure that we only combine elements of matching
- * windows.
- */
-public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
- extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> {
-
- protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn;
-
- protected final WindowingStrategy<?, W> windowingStrategy;
-
- protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
- protected final SerializedPipelineOptions serializedOptions;
-
- public FlinkReduceFunction(
- CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
- WindowingStrategy<?, W> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions) {
-
- this.combineFn = keyedCombineFn;
-
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
-
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
- }
-
- @Override
- public void reduce(
- Iterable<WindowedValue<KV<K, AccumT>>> elements,
- Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
-
- PipelineOptions options = serializedOptions.getPipelineOptions();
-
- FlinkSideInputReader sideInputReader =
- new FlinkSideInputReader(sideInputs, getRuntimeContext());
-
- PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
- PerKeyCombineFnRunners.create(combineFn);
-
- @SuppressWarnings("unchecked")
- OutputTimeFn<? super BoundedWindow> outputTimeFn =
- (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn();
-
-
- // get all elements so that we can sort them, has to fit into
- // memory
- // this seems very unprudent, but correct, for now
- ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
- for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
- for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) {
- sortedInput.add(exploded);
- }
- }
- Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() {
- @Override
- public int compare(
- WindowedValue<KV<K, AccumT>> o1,
- WindowedValue<KV<K, AccumT>> o2) {
- return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
- .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
- }
- });
-
- // iterate over the elements that are sorted by window timestamp
- //
- final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator();
-
- // get the first accumulator
- WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
- K key = currentValue.getValue().getKey();
- BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
- AccumT accumulator = currentValue.getValue().getValue();
-
- // we use this to keep track of the timestamps assigned by the OutputTimeFn,
- // in FlinkPartialReduceFunction we already merge the timestamps assigned
- // to individual elements, here we just merge them
- List<Instant> windowTimestamps = new ArrayList<>();
- windowTimestamps.add(currentValue.getTimestamp());
-
- while (iterator.hasNext()) {
- WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
- BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows());
-
- if (nextWindow.equals(currentWindow)) {
- // continue accumulating
- accumulator = combineFnRunner.mergeAccumulators(
- key, ImmutableList.of(accumulator, nextValue.getValue().getValue()),
- options, sideInputReader, currentValue.getWindows());
-
- windowTimestamps.add(nextValue.getTimestamp());
- } else {
- // emit the value that we currently have
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- outputTimeFn.merge(currentWindow, windowTimestamps),
- currentWindow,
- PaneInfo.NO_FIRING));
-
- windowTimestamps.clear();
-
- currentWindow = nextWindow;
- currentValue = nextValue;
- accumulator = nextValue.getValue().getValue();
- windowTimestamps.add(nextValue.getTimestamp());
- }
-
- }
-
- // emit the final accumulator
- out.collect(
- WindowedValue.of(
- KV.of(key, combineFnRunner.extractOutput(key, accumulator,
- options, sideInputReader, currentValue.getWindows())),
- outputTimeFn.merge(currentWindow, windowTimestamps),
- currentWindow,
- PaneInfo.NO_FIRING));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
deleted file mode 100644
index c317182..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/**
- * A {@link SideInputReader} for the Flink Batch Runner.
- */
-public class FlinkSideInputReader implements SideInputReader {
-
- private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs;
-
- private RuntimeContext runtimeContext;
-
- public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView,
- RuntimeContext runtimeContext) {
- sideInputs = new HashMap<>();
- for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) {
- sideInputs.put(entry.getKey().getTagInternal(), entry.getValue());
- }
- this.runtimeContext = runtimeContext;
- }
-
- @Nullable
- @Override
- public <T> T get(PCollectionView<T> view, BoundedWindow window) {
- checkNotNull(view, "View passed to sideInput cannot be null");
- TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal();
- checkNotNull(
- sideInputs.get(tag),
- "Side input for " + view + " not available.");
-
- Map<BoundedWindow, T> sideInputs =
- runtimeContext.getBroadcastVariableWithInitializer(
- tag.getId(), new SideInputInitializer<>(view));
- T result = sideInputs.get(window);
- if (result == null) {
- result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
- }
- return result;
- }
-
- @Override
- public <T> boolean contains(PCollectionView<T> view) {
- return sideInputs.containsKey(view.getTagInternal());
- }
-
- @Override
- public boolean isEmpty() {
- return sideInputs.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
deleted file mode 100644
index c8193d2..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.InMemoryStateInternals;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner.
- */
-public class FlinkStatefulDoFnFunction<K, V, OutputT>
- extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> {
-
- private final DoFn<KV<K, V>, OutputT> dofn;
- private final WindowingStrategy<?, ?> windowingStrategy;
- private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
- private final SerializedPipelineOptions serializedOptions;
- private final Map<TupleTag<?>, Integer> outputMap;
- private final TupleTag<OutputT> mainOutputTag;
- private transient DoFnInvoker doFnInvoker;
-
- public FlinkStatefulDoFnFunction(
- DoFn<KV<K, V>, OutputT> dofn,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
- PipelineOptions pipelineOptions,
- Map<TupleTag<?>, Integer> outputMap,
- TupleTag<OutputT> mainOutputTag) {
-
- this.dofn = dofn;
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
- this.outputMap = outputMap;
- this.mainOutputTag = mainOutputTag;
- }
-
- @Override
- public void reduce(
- Iterable<WindowedValue<KV<K, V>>> values,
- Collector<WindowedValue<OutputT>> out) throws Exception {
- RuntimeContext runtimeContext = getRuntimeContext();
-
- DoFnRunners.OutputManager outputManager;
- if (outputMap == null) {
- outputManager = new FlinkDoFnFunction.DoFnOutputManager(out);
- } else {
- // it has some additional Outputs
- outputManager =
- new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
- }
-
- final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator();
-
- // get the first value, we need this for initializing the state internals with the key.
- // we are guaranteed to have a first value, otherwise reduce() would not have been called.
- WindowedValue<KV<K, V>> currentValue = iterator.next();
- final K key = currentValue.getValue().getKey();
-
- final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key);
-
- // Used with Batch, we know that all the data is available for this key. We can't use the
- // timer manager from the context because it doesn't exist. So we create one and advance
- // time to the end after processing all elements.
- final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
- timerInternals.advanceProcessingTime(Instant.now());
- timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
- DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
- serializedOptions.getPipelineOptions(), dofn,
- new FlinkSideInputReader(sideInputs, runtimeContext),
- outputManager,
- mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of additional outputs
- Collections.<TupleTag<?>>emptyList(),
- new FlinkNoOpStepContext() {
- @Override
- public StateInternals<?> stateInternals() {
- return stateInternals;
- }
- @Override
- public TimerInternals timerInternals() {
- return timerInternals;
- }
- },
- new FlinkAggregatorFactory(runtimeContext),
- windowingStrategy);
-
- doFnRunner.startBundle();
-
- doFnRunner.processElement(currentValue);
- while (iterator.hasNext()) {
- currentValue = iterator.next();
- doFnRunner.processElement(currentValue);
- }
-
- // Finish any pending windows by advancing the input watermark to infinity.
- timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- // Finally, advance the processing time to infinity to fire any timers.
- timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
- timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
- fireEligibleTimers(timerInternals, doFnRunner);
-
- doFnRunner.finishBundle();
- }
-
- private void fireEligibleTimers(
- InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner)
- throws Exception {
-
- while (true) {
-
- TimerInternals.TimerData timer;
- boolean hasFired = false;
-
- while ((timer = timerInternals.removeNextEventTimer()) != null) {
- hasFired = true;
- fireTimer(timer, runner);
- }
- while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
- hasFired = true;
- fireTimer(timer, runner);
- }
- while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
- hasFired = true;
- fireTimer(timer, runner);
- }
- if (!hasFired) {
- break;
- }
- }
- }
-
- private void fireTimer(
- TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) {
- StateNamespace namespace = timer.getNamespace();
- checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
- BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- doFnInvoker = DoFnInvokers.invokerFor(dofn);
- doFnInvoker.invokeSetup();
- }
-
- @Override
- public void close() throws Exception {
- doFnInvoker.invokeTeardown();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
deleted file mode 100644
index 12222b4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-
-/**
- * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map}
- * from window to side input.
- */
-public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow>
- implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> {
-
- PCollectionView<ViewT> view;
-
- public SideInputInitializer(PCollectionView<ViewT> view) {
- this.view = view;
- }
-
- @Override
- public Map<BoundedWindow, ViewT> initializeBroadcastVariable(
- Iterable<WindowedValue<ElemT>> inputValues) {
-
- // first partition into windows
- Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>();
- for (WindowedValue<ElemT> value: inputValues) {
- for (BoundedWindow window: value.getWindows()) {
- List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window);
- if (windowedValues == null) {
- windowedValues = new ArrayList<>();
- partitionedElements.put(window, windowedValues);
- }
- windowedValues.add(value);
- }
- }
-
- Map<BoundedWindow, ViewT> resultMap = new HashMap<>();
-
- for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements:
- partitionedElements.entrySet()) {
-
- @SuppressWarnings("unchecked")
- Iterable<WindowedValue<?>> elementsIterable =
- (List<WindowedValue<?>>) (List<?>) elements.getValue();
-
- resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable));
- }
-
- return resultMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
deleted file mode 100644
index 9f11212..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.functions;