You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:15:09 UTC
[35/50] [abbrv] beam git commit: [BEAM-1994] Remove Flink examples
package
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
deleted file mode 100644
index 9a52330..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.ImmutableList;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.List;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItemCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.PropertyNames;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * Singleton keyed work item coder.
- */
-public class SingletonKeyedWorkItemCoder<K, ElemT>
- extends StandardCoder<SingletonKeyedWorkItem<K, ElemT>> {
- /**
- * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window
- * coder.
- */
- public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
- Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
- return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
- }
-
- @JsonCreator
- public static <K, ElemT> SingletonKeyedWorkItemCoder<K, ElemT> of(
- @JsonProperty(PropertyNames.COMPONENT_ENCODINGS) List<Coder<?>> components) {
- checkArgument(components.size() == 3, "Expecting 3 components, got %s", components.size());
- @SuppressWarnings("unchecked")
- Coder<K> keyCoder = (Coder<K>) components.get(0);
- @SuppressWarnings("unchecked")
- Coder<ElemT> elemCoder = (Coder<ElemT>) components.get(1);
- @SuppressWarnings("unchecked")
- Coder<? extends BoundedWindow> windowCoder = (Coder<? extends BoundedWindow>) components.get(2);
- return new SingletonKeyedWorkItemCoder<>(keyCoder, elemCoder, windowCoder);
- }
-
- private final Coder<K> keyCoder;
- private final Coder<ElemT> elemCoder;
- private final Coder<? extends BoundedWindow> windowCoder;
- private final WindowedValue.FullWindowedValueCoder<ElemT> valueCoder;
-
- private SingletonKeyedWorkItemCoder(
- Coder<K> keyCoder, Coder<ElemT> elemCoder, Coder<? extends BoundedWindow> windowCoder) {
- this.keyCoder = keyCoder;
- this.elemCoder = elemCoder;
- this.windowCoder = windowCoder;
- valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder);
- }
-
- public Coder<K> getKeyCoder() {
- return keyCoder;
- }
-
- public Coder<ElemT> getElementCoder() {
- return elemCoder;
- }
-
- @Override
- public void encode(SingletonKeyedWorkItem<K, ElemT> value,
- OutputStream outStream,
- Context context)
- throws CoderException, IOException {
- keyCoder.encode(value.key(), outStream, context.nested());
- valueCoder.encode(value.value, outStream, context);
- }
-
- @Override
- public SingletonKeyedWorkItem<K, ElemT> decode(InputStream inStream, Context context)
- throws CoderException, IOException {
- K key = keyCoder.decode(inStream, context.nested());
- WindowedValue<ElemT> value = valueCoder.decode(inStream, context);
- return new SingletonKeyedWorkItem<>(key, value);
- }
-
- @Override
- public List<? extends Coder<?>> getCoderArguments() {
- return ImmutableList.of(keyCoder, elemCoder, windowCoder);
- }
-
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- keyCoder.verifyDeterministic();
- elemCoder.verifyDeterministic();
- windowCoder.verifyDeterministic();
- }
-
- /**
- * {@inheritDoc}.
- *
- * {@link KeyedWorkItemCoder} is not consistent with equals as it can return a
- * {@link KeyedWorkItem} of a type different from the originally encoded type.
- */
- @Override
- public boolean consistentWithEquals() {
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
deleted file mode 100644
index 40f70e4..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import org.apache.beam.runners.core.ElementAndRestriction;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
-import org.apache.beam.runners.core.OutputWindowedValue;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/**
- * Flink operator for executing splittable {@link DoFn DoFns}. Specifically, for executing
- * the {@code @ProcessElement} method of a splittable {@link DoFn}.
- */
-public class SplittableDoFnOperator<
- InputT, FnOutputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
- extends DoFnOperator<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT, OutputT> {
-
- public SplittableDoFnOperator(
- DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, FnOutputT> doFn,
- Coder<
- WindowedValue<
- KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> inputCoder,
- TupleTag<FnOutputT> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- OutputManagerFactory<OutputT> outputManagerFactory,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<Integer, PCollectionView<?>> sideInputTagMapping,
- Collection<PCollectionView<?>> sideInputs,
- PipelineOptions options,
- Coder<?> keyCoder) {
- super(
- doFn,
- inputCoder,
- mainOutputTag,
- additionalOutputTags,
- outputManagerFactory,
- windowingStrategy,
- sideInputTagMapping,
- sideInputs,
- options,
- keyCoder);
-
- }
-
- @Override
- public void open() throws Exception {
- super.open();
-
- checkState(doFn instanceof SplittableParDo.ProcessFn);
-
- StateInternalsFactory<String> stateInternalsFactory = new StateInternalsFactory<String>() {
- @Override
- public StateInternals<String> stateInternalsForKey(String key) {
- //this will implicitly be keyed by the key of the incoming
- // element or by the key of a firing timer
- return (StateInternals<String>) stateInternals;
- }
- };
- TimerInternalsFactory<String> timerInternalsFactory = new TimerInternalsFactory<String>() {
- @Override
- public TimerInternals timerInternalsForKey(String key) {
- //this will implicitly be keyed like the StateInternalsFactory
- return timerInternals;
- }
- };
-
- ((SplittableParDo.ProcessFn) doFn).setStateInternalsFactory(stateInternalsFactory);
- ((SplittableParDo.ProcessFn) doFn).setTimerInternalsFactory(timerInternalsFactory);
- ((SplittableParDo.ProcessFn) doFn).setProcessElementInvoker(
- new OutputAndTimeBoundedSplittableProcessElementInvoker<>(
- doFn,
- serializedOptions.getPipelineOptions(),
- new OutputWindowedValue<FnOutputT>() {
- @Override
- public void outputWindowedValue(
- FnOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(
- mainOutputTag,
- WindowedValue.of(output, timestamp, windows, pane));
- }
-
- @Override
- public <AdditionalOutputT> void outputWindowedValue(
- TupleTag<AdditionalOutputT> tag,
- AdditionalOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane));
- }
- },
- sideInputReader,
- Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
- 10000,
- Duration.standardSeconds(10)));
- }
-
- @Override
- public void fireTimer(InternalTimer<?, TimerInternals.TimerData> timer) {
- doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
- (String) stateInternals.getKey(),
- Collections.singletonList(timer.getNamespace()))));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
deleted file mode 100644
index 9b2136c..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import static org.apache.beam.runners.core.TimerInternals.TimerData;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternalsFactory;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-
-/**
- * Flink operator for executing window {@link DoFn DoFns}.
- */
-public class WindowDoFnOperator<K, InputT, OutputT>
- extends DoFnOperator<KeyedWorkItem<K, InputT>, KV<K, OutputT>, WindowedValue<KV<K, OutputT>>> {
-
- private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
-
- public WindowDoFnOperator(
- SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn,
- Coder<WindowedValue<KeyedWorkItem<K, InputT>>> inputCoder,
- TupleTag<KV<K, OutputT>> mainOutputTag,
- List<TupleTag<?>> additionalOutputTags,
- OutputManagerFactory<WindowedValue<KV<K, OutputT>>> outputManagerFactory,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<Integer, PCollectionView<?>> sideInputTagMapping,
- Collection<PCollectionView<?>> sideInputs,
- PipelineOptions options,
- Coder<K> keyCoder) {
- super(
- null,
- inputCoder,
- mainOutputTag,
- additionalOutputTags,
- outputManagerFactory,
- windowingStrategy,
- sideInputTagMapping,
- sideInputs,
- options,
- keyCoder);
-
- this.systemReduceFn = systemReduceFn;
-
- }
-
- @Override
- protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
- StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
- @Override
- public StateInternals<K> stateInternalsForKey(K key) {
- //this will implicitly be keyed by the key of the incoming
- // element or by the key of a firing timer
- return (StateInternals<K>) stateInternals;
- }
- };
- TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
- @Override
- public TimerInternals timerInternalsForKey(K key) {
- //this will implicitly be keyed like the StateInternalsFactory
- return timerInternals;
- }
- };
-
- // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
- // has the window type as generic parameter while WindowingStrategy is almost always
- // untyped.
- @SuppressWarnings("unchecked")
- DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
- GroupAlsoByWindowViaWindowSetNewDoFn.create(
- windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
- (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
- return doFn;
- }
-
- @Override
- public void fireTimer(InternalTimer<?, TimerData> timer) {
- doFnRunner.processElement(WindowedValue.valueInGlobalWindow(
- KeyedWorkItems.<K, InputT>timersWorkItem(
- (K) stateInternals.getKey(),
- Collections.singletonList(timer.getNamespace()))));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
deleted file mode 100644
index 1dff367..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import java.nio.ByteBuffer;
-import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
-/**
- * {@link KeySelector} that retrieves a key from a {@link KeyedWorkItem}. This will return
- * the key as encoded by the provided {@link Coder} in a {@link ByteBuffer}. This ensures
- * that all key comparisons/hashing happen on the encoded form.
- */
-public class WorkItemKeySelector<K, V>
- implements KeySelector<WindowedValue<SingletonKeyedWorkItem<K, V>>, ByteBuffer>,
- ResultTypeQueryable<ByteBuffer> {
-
- private final Coder<K> keyCoder;
-
- public WorkItemKeySelector(Coder<K> keyCoder) {
- this.keyCoder = keyCoder;
- }
-
- @Override
- public ByteBuffer getKey(WindowedValue<SingletonKeyedWorkItem<K, V>> value) throws Exception {
- K key = value.getValue().key();
- byte[] keyBytes = CoderUtils.encodeToByteArray(keyCoder, key);
- return ByteBuffer.wrap(keyBytes);
- }
-
- @Override
- public TypeInformation<ByteBuffer> getProducedType() {
- return new GenericTypeInfo<>(ByteBuffer.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
deleted file mode 100644
index 2ed5024..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
- */
-public class BoundedSourceWrapper<OutputT>
- extends RichParallelSourceFunction<WindowedValue<OutputT>>
- implements StoppableFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
-
- /**
- * Keep the options so that we can initialize the readers.
- */
- private final SerializedPipelineOptions serializedOptions;
-
- /**
- * The split sources. We split them in the constructor to ensure that all parallel
- * sources are consistent about the split sources.
- */
- private List<? extends BoundedSource<OutputT>> splitSources;
-
- /**
- * Make it a field so that we can access it in {@link #close()}.
- */
- private transient List<BoundedSource.BoundedReader<OutputT>> readers;
-
- /**
- * Initialize here and not in run() to prevent races where we cancel a job before run() is
- * ever called or run() is called after cancel().
- */
- private volatile boolean isRunning = true;
-
- @SuppressWarnings("unchecked")
- public BoundedSourceWrapper(
- PipelineOptions pipelineOptions,
- BoundedSource<OutputT> source,
- int parallelism) throws Exception {
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
- long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
-
- // get the splits early. we assume that the generated splits are stable,
- // this is necessary so that the mapping of state to source is correct
- // when restoring
- splitSources = source.split(desiredBundleSize, pipelineOptions);
- }
-
- @Override
- public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
- // figure out which split sources we're responsible for
- int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
- int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
- List<BoundedSource<OutputT>> localSources = new ArrayList<>();
-
- for (int i = 0; i < splitSources.size(); i++) {
- if (i % numSubtasks == subtaskIndex) {
- localSources.add(splitSources.get(i));
- }
- }
-
- LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
- subtaskIndex,
- numSubtasks,
- localSources);
-
- readers = new ArrayList<>();
- // initialize readers from scratch
- for (BoundedSource<OutputT> source : localSources) {
- readers.add(source.createReader(serializedOptions.getPipelineOptions()));
- }
-
- if (readers.size() == 1) {
- // the easy case, we just read from one reader
- BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
-
- boolean dataAvailable = reader.start();
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
-
- while (isRunning) {
- dataAvailable = reader.advance();
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- } else {
- break;
- }
- }
- } else {
- // a bit more complicated, we are responsible for several readers
- // loop through them and sleep if none of them had any data
-
- int currentReader = 0;
-
- // start each reader and emit data if immediately available
- for (BoundedSource.BoundedReader<OutputT> reader : readers) {
- boolean dataAvailable = reader.start();
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
- }
-
- // a flag telling us whether any of the readers had data
- // if no reader had data, sleep for bit
- boolean hadData = false;
- while (isRunning && !readers.isEmpty()) {
- BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
- boolean dataAvailable = reader.advance();
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- hadData = true;
- } else {
- readers.remove(currentReader);
- currentReader--;
- if (readers.isEmpty()) {
- break;
- }
- }
-
- currentReader = (currentReader + 1) % readers.size();
- if (currentReader == 0 && !hadData) {
- Thread.sleep(50);
- } else if (currentReader == 0) {
- hadData = false;
- }
- }
-
- }
-
- // emit final Long.MAX_VALUE watermark, just to be sure
- ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
- }
-
- /**
- * Emit the current element from the given Reader. The reader is guaranteed to have data.
- */
- private void emitElement(
- SourceContext<WindowedValue<OutputT>> ctx,
- BoundedSource.BoundedReader<OutputT> reader) {
- // make sure that reader state update and element emission are atomic
- // with respect to snapshots
- synchronized (ctx.getCheckpointLock()) {
-
- OutputT item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue<OutputT> windowedValue =
- WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (readers != null) {
- for (BoundedSource.BoundedReader<OutputT> reader: readers) {
- reader.close();
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void stop() {
- this.isRunning = false;
- }
-
- /**
- * Visible so that we can check this in tests. Must not be used for anything else.
- */
- @VisibleForTesting
- public List<? extends BoundedSource<OutputT>> getSplitSources() {
- return splitSources;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
deleted file mode 100644
index 910a33f..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Collections;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * An example unbounded Beam source that reads input from a socket.
- * This is used mainly for testing and debugging.
- * */
-public class UnboundedSocketSource<CheckpointMarkT extends UnboundedSource.CheckpointMark>
- extends UnboundedSource<String, CheckpointMarkT> {
-
- private static final Coder<String> DEFAULT_SOCKET_CODER = StringUtf8Coder.of();
-
- private static final long serialVersionUID = 1L;
-
- private static final int DEFAULT_CONNECTION_RETRY_SLEEP = 500;
-
- private static final int CONNECTION_TIMEOUT_TIME = 0;
-
- private final String hostname;
- private final int port;
- private final char delimiter;
- private final long maxNumRetries;
- private final long delayBetweenRetries;
-
- public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries) {
- this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP);
- }
-
- public UnboundedSocketSource(String hostname,
- int port,
- char delimiter,
- long maxNumRetries,
- long delayBetweenRetries) {
- this.hostname = hostname;
- this.port = port;
- this.delimiter = delimiter;
- this.maxNumRetries = maxNumRetries;
- this.delayBetweenRetries = delayBetweenRetries;
- }
-
- public String getHostname() {
- return this.hostname;
- }
-
- public int getPort() {
- return this.port;
- }
-
- public char getDelimiter() {
- return this.delimiter;
- }
-
- public long getMaxNumRetries() {
- return this.maxNumRetries;
- }
-
- public long getDelayBetweenRetries() {
- return this.delayBetweenRetries;
- }
-
- @Override
- public List<? extends UnboundedSource<String, CheckpointMarkT>> split(
- int desiredNumSplits,
- PipelineOptions options) throws Exception {
- return Collections.<UnboundedSource<String, CheckpointMarkT>>singletonList(this);
- }
-
- @Override
- public UnboundedReader<String> createReader(PipelineOptions options,
- @Nullable CheckpointMarkT checkpointMark) {
- return new UnboundedSocketReader(this);
- }
-
- @Nullable
- @Override
- public Coder getCheckpointMarkCoder() {
- // Flink and Dataflow have different checkpointing mechanisms.
- // In our case we do not need a coder.
- return null;
- }
-
- @Override
- public void validate() {
- checkArgument(port > 0 && port < 65536, "port is out of range");
- checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), "
- + "or -1 (infinite retries)");
- checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive");
- }
-
- @Override
- public Coder getDefaultOutputCoder() {
- return DEFAULT_SOCKET_CODER;
- }
-
- /**
- * Unbounded socket reader.
- */
- public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader<String> {
-
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class);
-
- private final UnboundedSocketSource source;
-
- private Socket socket;
- private BufferedReader reader;
-
- private boolean isRunning;
-
- private String currentRecord;
-
- public UnboundedSocketReader(UnboundedSocketSource source) {
- this.source = source;
- }
-
- private void openConnection() throws IOException {
- this.socket = new Socket();
- this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()),
- CONNECTION_TIMEOUT_TIME);
- this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
- this.isRunning = true;
- }
-
- @Override
- public boolean start() throws IOException {
- int attempt = 0;
- while (!isRunning) {
- try {
- openConnection();
- LOG.info("Connected to server socket " + this.source.getHostname() + ':'
- + this.source.getPort());
-
- return advance();
- } catch (IOException e) {
- LOG.info("Lost connection to server socket " + this.source.getHostname() + ':'
- + this.source.getPort() + ". Retrying in "
- + this.source.getDelayBetweenRetries() + " msecs...");
-
- if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) {
- try {
- Thread.sleep(this.source.getDelayBetweenRetries());
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- } else {
- this.isRunning = false;
- break;
- }
- }
- }
- LOG.error("Unable to connect to host " + this.source.getHostname()
- + " : " + this.source.getPort());
- return false;
- }
-
- @Override
- public boolean advance() throws IOException {
- final StringBuilder buffer = new StringBuilder();
- int data;
- while (isRunning && (data = reader.read()) != -1) {
- // check if the string is complete
- if (data != this.source.getDelimiter()) {
- buffer.append((char) data);
- } else {
- if (buffer.length() > 0 && buffer.charAt(buffer.length() - 1) == '\r') {
- buffer.setLength(buffer.length() - 1);
- }
- this.currentRecord = buffer.toString();
- buffer.setLength(0);
- return true;
- }
- }
- return false;
- }
-
- @Override
- public byte[] getCurrentRecordId() throws NoSuchElementException {
- return new byte[0];
- }
-
- @Override
- public String getCurrent() throws NoSuchElementException {
- return this.currentRecord;
- }
-
- @Override
- public Instant getCurrentTimestamp() throws NoSuchElementException {
- return Instant.now();
- }
-
- @Override
- public void close() throws IOException {
- this.reader.close();
- this.socket.close();
- this.isRunning = false;
- LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":"
- + this.source.getPort() + ".");
- }
-
- @Override
- public Instant getWatermark() {
- return Instant.now();
- }
-
- @Override
- public CheckpointMark getCheckpointMark() {
- return null;
- }
-
- @Override
- public UnboundedSource<String, ?> getCurrentSource() {
- return this.source;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
deleted file mode 100644
index bb9b58a..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.SerializableCoder;
-import org.apache.beam.sdk.io.UnboundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.OperatorStateStore;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link UnboundedSource UnboundedSources} as a Flink Source.
- */
-public class UnboundedSourceWrapper<
- OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
- extends RichParallelSourceFunction<WindowedValue<OutputT>>
- implements ProcessingTimeCallback, StoppableFunction,
- CheckpointListener, CheckpointedFunction {
-
- private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class);
-
- /**
- * Keep the options so that we can initialize the localReaders.
- */
- private final SerializedPipelineOptions serializedOptions;
-
- /**
- * For snapshot and restore.
- */
- private final KvCoder<
- ? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> checkpointCoder;
-
- /**
- * The split sources. We split them in the constructor to ensure that all parallel
- * sources are consistent about the split sources.
- */
- private final List<? extends UnboundedSource<OutputT, CheckpointMarkT>> splitSources;
-
- /**
- * The local split sources. Assigned at runtime when the wrapper is executed in parallel.
- */
- private transient List<UnboundedSource<OutputT, CheckpointMarkT>> localSplitSources;
-
- /**
- * The local split readers. Assigned at runtime when the wrapper is executed in parallel.
- * Make it a field so that we can access it in {@link #onProcessingTime(long)} for
- * emitting watermarks.
- */
- private transient List<UnboundedSource.UnboundedReader<OutputT>> localReaders;
-
- /**
- * Flag to indicate whether the source is running.
- * Initialize here and not in run() to prevent races where we cancel a job before run() is
- * ever called or run() is called after cancel().
- */
- private volatile boolean isRunning = true;
-
- /**
- * Make it a field so that we can access it in {@link #onProcessingTime(long)} for registering new
- * triggers.
- */
- private transient StreamingRuntimeContext runtimeContext;
-
- /**
- * Make it a field so that we can access it in {@link #onProcessingTime(long)} for emitting
- * watermarks.
- */
- private transient SourceContext<WindowedValue<OutputT>> context;
-
- /**
- * Pending checkpoints which have not been acknowledged yet.
- */
- private transient LinkedHashMap<Long, List<CheckpointMarkT>> pendingCheckpoints;
- /**
- * Keep a maximum of 32 checkpoints for {@code CheckpointMark.finalizeCheckpoint()}.
- */
- private static final int MAX_NUMBER_PENDING_CHECKPOINTS = 32;
-
- private transient ListState<KV<? extends
- UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>> stateForCheckpoint;
-
- /**
- * false if checkpointCoder is null or no restore state by starting first.
- */
- private transient boolean isRestored = false;
-
- @SuppressWarnings("unchecked")
- public UnboundedSourceWrapper(
- PipelineOptions pipelineOptions,
- UnboundedSource<OutputT, CheckpointMarkT> source,
- int parallelism) throws Exception {
- this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
-
- if (source.requiresDeduping()) {
- LOG.warn("Source {} requires deduping but Flink runner doesn't support this yet.", source);
- }
-
- Coder<CheckpointMarkT> checkpointMarkCoder = source.getCheckpointMarkCoder();
- if (checkpointMarkCoder == null) {
- LOG.info("No CheckpointMarkCoder specified for this source. Won't create snapshots.");
- checkpointCoder = null;
- } else {
-
- Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
- (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {
- });
-
- checkpointCoder = KvCoder.of(sourceCoder, checkpointMarkCoder);
- }
-
- // get the splits early. we assume that the generated splits are stable,
- // this is necessary so that the mapping of state to source is correct
- // when restoring
- splitSources = source.split(parallelism, pipelineOptions);
- }
-
-
- /**
- * Initialize and restore state before starting execution of the source.
- */
- @Override
- public void open(Configuration parameters) throws Exception {
- runtimeContext = (StreamingRuntimeContext) getRuntimeContext();
-
- // figure out which split sources we're responsible for
- int subtaskIndex = runtimeContext.getIndexOfThisSubtask();
- int numSubtasks = runtimeContext.getNumberOfParallelSubtasks();
-
- localSplitSources = new ArrayList<>();
- localReaders = new ArrayList<>();
-
- pendingCheckpoints = new LinkedHashMap<>();
-
- if (isRestored) {
- // restore the splitSources from the checkpoint to ensure consistent ordering
- for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
- stateForCheckpoint.get()) {
- localSplitSources.add(restored.getKey());
- localReaders.add(restored.getKey().createReader(
- serializedOptions.getPipelineOptions(), restored.getValue()));
- }
- } else {
- // initialize localReaders and localSources from scratch
- for (int i = 0; i < splitSources.size(); i++) {
- if (i % numSubtasks == subtaskIndex) {
- UnboundedSource<OutputT, CheckpointMarkT> source =
- splitSources.get(i);
- UnboundedSource.UnboundedReader<OutputT> reader =
- source.createReader(serializedOptions.getPipelineOptions(), null);
- localSplitSources.add(source);
- localReaders.add(reader);
- }
- }
- }
-
- LOG.info("Unbounded Flink Source {}/{} is reading from sources: {}",
- subtaskIndex,
- numSubtasks,
- localSplitSources);
- }
-
- @Override
- public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
- context = ctx;
-
- if (localReaders.size() == 0) {
- // do nothing, but still look busy ...
- // also, output a Long.MAX_VALUE watermark since we know that we're not
- // going to emit anything
- // we can't return here since Flink requires that all operators stay up,
- // otherwise checkpointing would not work correctly anymore
- ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
- // wait until this is canceled
- final Object waitLock = new Object();
- while (isRunning) {
- try {
- // Flink will interrupt us at some point
- //noinspection SynchronizationOnLocalVariableOrMethodParameter
- synchronized (waitLock) {
- // don't wait indefinitely, in case something goes horribly wrong
- waitLock.wait(1000);
- }
- } catch (InterruptedException e) {
- if (!isRunning) {
- // restore the interrupted state, and fall through the loop
- Thread.currentThread().interrupt();
- }
- }
- }
- } else if (localReaders.size() == 1) {
- // the easy case, we just read from one reader
- UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
-
- boolean dataAvailable = reader.start();
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
-
- setNextWatermarkTimer(this.runtimeContext);
-
- while (isRunning) {
- dataAvailable = reader.advance();
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- } else {
- Thread.sleep(50);
- }
- }
- } else {
- // a bit more complicated, we are responsible for several localReaders
- // loop through them and sleep if none of them had any data
-
- int numReaders = localReaders.size();
- int currentReader = 0;
-
- // start each reader and emit data if immediately available
- for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
- boolean dataAvailable = reader.start();
- if (dataAvailable) {
- emitElement(ctx, reader);
- }
- }
-
- // a flag telling us whether any of the localReaders had data
- // if no reader had data, sleep for bit
- boolean hadData = false;
- while (isRunning) {
- UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
- boolean dataAvailable = reader.advance();
-
- if (dataAvailable) {
- emitElement(ctx, reader);
- hadData = true;
- }
-
- currentReader = (currentReader + 1) % numReaders;
- if (currentReader == 0 && !hadData) {
- Thread.sleep(50);
- } else if (currentReader == 0) {
- hadData = false;
- }
- }
-
- }
- }
-
- /**
- * Emit the current element from the given Reader. The reader is guaranteed to have data.
- */
- private void emitElement(
- SourceContext<WindowedValue<OutputT>> ctx,
- UnboundedSource.UnboundedReader<OutputT> reader) {
- // make sure that reader state update and element emission are atomic
- // with respect to snapshots
- synchronized (ctx.getCheckpointLock()) {
-
- OutputT item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
-
- WindowedValue<OutputT> windowedValue =
- WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
- ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (localReaders != null) {
- for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
- reader.close();
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public void stop() {
- isRunning = false;
- }
-
- // ------------------------------------------------------------------------
- // Checkpoint and restore
- // ------------------------------------------------------------------------
-
- @Override
- public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
- if (!isRunning) {
- LOG.debug("snapshotState() called on closed source");
- } else {
-
- if (checkpointCoder == null) {
- // no checkpoint coder available in this source
- return;
- }
-
- stateForCheckpoint.clear();
-
- long checkpointId = functionSnapshotContext.getCheckpointId();
-
- // we checkpoint the sources along with the CheckpointMarkT to ensure
- // than we have a correct mapping of checkpoints to sources when
- // restoring
- List<CheckpointMarkT> checkpointMarks = new ArrayList<>(localSplitSources.size());
-
- for (int i = 0; i < localSplitSources.size(); i++) {
- UnboundedSource<OutputT, CheckpointMarkT> source = localSplitSources.get(i);
- UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(i);
-
- @SuppressWarnings("unchecked")
- CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
- checkpointMarks.add(mark);
- KV<UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> kv =
- KV.of(source, mark);
- stateForCheckpoint.add(kv);
- }
-
- // cleanup old pending checkpoints and add new checkpoint
- int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
- if (diff >= 0) {
- for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
- diff >= 0;
- diff--) {
- iterator.next();
- iterator.remove();
- }
- }
- pendingCheckpoints.put(checkpointId, checkpointMarks);
-
- }
- }
-
- @Override
- public void initializeState(FunctionInitializationContext context) throws Exception {
- if (checkpointCoder == null) {
- // no checkpoint coder available in this source
- return;
- }
-
- OperatorStateStore stateStore = context.getOperatorStateStore();
- CoderTypeInformation<
- KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT>>
- typeInformation = (CoderTypeInformation) new CoderTypeInformation<>(checkpointCoder);
- stateForCheckpoint = stateStore.getOperatorState(
- new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
- typeInformation.createSerializer(new ExecutionConfig())));
-
- if (context.isRestored()) {
- isRestored = true;
- LOG.info("Having restore state in the UnbounedSourceWrapper.");
- } else {
- LOG.info("No restore state for UnbounedSourceWrapper.");
- }
- }
-
- @Override
- public void onProcessingTime(long timestamp) throws Exception {
- if (this.isRunning) {
- synchronized (context.getCheckpointLock()) {
- // find minimum watermark over all localReaders
- long watermarkMillis = Long.MAX_VALUE;
- for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
- Instant watermark = reader.getWatermark();
- if (watermark != null) {
- watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);
- }
- }
- context.emitWatermark(new Watermark(watermarkMillis));
- }
- setNextWatermarkTimer(this.runtimeContext);
- }
- }
-
- private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
- if (this.isRunning) {
- long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
- long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
- runtime.getProcessingTimeService().registerTimer(timeToNextWatermark, this);
- }
- }
-
- private long getTimeToNextWatermark(long watermarkInterval) {
- return System.currentTimeMillis() + watermarkInterval;
- }
-
- /**
- * Visible so that we can check this in tests. Must not be used for anything else.
- */
- @VisibleForTesting
- public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getSplitSources() {
- return splitSources;
- }
-
- /**
- * Visible so that we can check this in tests. Must not be used for anything else.
- */
- @VisibleForTesting
- public List<? extends UnboundedSource<OutputT, CheckpointMarkT>> getLocalSplitSources() {
- return localSplitSources;
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws Exception {
-
- List<CheckpointMarkT> checkpointMarks = pendingCheckpoints.get(checkpointId);
-
- if (checkpointMarks != null) {
-
- // remove old checkpoints including the current one
- Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
- long currentId;
- do {
- currentId = iterator.next();
- iterator.remove();
- } while (currentId != checkpointId);
-
- // confirm all marks
- for (CheckpointMarkT mark : checkpointMarks) {
- mark.finalizeCheckpoint();
- }
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
deleted file mode 100644
index b431ce7..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
deleted file mode 100644
index 0674871..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Internal implementation of the Beam runner for Apache Flink.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
deleted file mode 100644
index 3203446..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkBroadcastStateInternals.java
+++ /dev/null
@@ -1,865 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.coders.MapCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-
-/**
- * {@link StateInternals} that uses a Flink {@link DefaultOperatorStateBackend}
- * to manage the broadcast state.
- * The state is the same on all parallel instances of the operator.
- * So we just need store state of operator-0 in OperatorStateBackend.
- *
- * <p>Note: Ignore index of key.
- * Mainly for SideInputs.
- */
-public class FlinkBroadcastStateInternals<K> implements StateInternals<K> {
-
- private int indexInSubtaskGroup;
- private final DefaultOperatorStateBackend stateBackend;
- // stateName -> <namespace, state>
- private Map<String, Map<String, ?>> stateForNonZeroOperator;
-
- public FlinkBroadcastStateInternals(int indexInSubtaskGroup, OperatorStateBackend stateBackend) {
- //TODO flink do not yet expose through public API
- this.stateBackend = (DefaultOperatorStateBackend) stateBackend;
- this.indexInSubtaskGroup = indexInSubtaskGroup;
- if (indexInSubtaskGroup != 0) {
- stateForNonZeroOperator = new HashMap<>();
- }
- }
-
- @Override
- public K getKey() {
- return null;
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address) {
-
- return state(namespace, address, StateContexts.nullContext());
- }
-
- @Override
- public <T extends State> T state(
- final StateNamespace namespace,
- StateTag<? super K, T> address,
- final StateContext<?> context) {
-
- return address.bind(new StateTag.StateBinder<K>() {
-
- @Override
- public <T> ValueState<T> bindValue(
- StateTag<? super K, ValueState<T>> address,
- Coder<T> coder) {
-
- return new FlinkBroadcastValueState<>(stateBackend, address, namespace, coder);
- }
-
- @Override
- public <T> BagState<T> bindBag(
- StateTag<? super K, BagState<T>> address,
- Coder<T> elemCoder) {
-
- return new FlinkBroadcastBagState<>(stateBackend, address, namespace, elemCoder);
- }
-
- @Override
- public <T> SetState<T> bindSet(
- StateTag<? super K, SetState<T>> address,
- Coder<T> elemCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", SetState.class.getSimpleName()));
- }
-
- @Override
- public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
- StateTag<? super K, MapState<KeyT, ValueT>> spec,
- Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", MapState.class.getSimpleName()));
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT>
- bindCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-
- return new FlinkCombiningState<>(
- stateBackend, address, combineFn, namespace, accumCoder);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkKeyedCombiningState<>(
- stateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkBroadcastStateInternals.this);
- }
-
- @Override
- public <InputT, AccumT, OutputT>
- CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext(
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Coder<AccumT> accumCoder,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn) {
- return new FlinkCombiningStateWithContext<>(
- stateBackend,
- address,
- combineFn,
- namespace,
- accumCoder,
- FlinkBroadcastStateInternals.this,
- CombineContextFactory.createFromStateContext(context));
- }
-
- @Override
- public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
- StateTag<? super K, WatermarkHoldState<W>> address,
- OutputTimeFn<? super W> outputTimeFn) {
- throw new UnsupportedOperationException(
- String.format("%s is not supported", WatermarkHoldState.class.getSimpleName()));
- }
- });
- }
-
- /**
- * 1. The way we would use it is to only checkpoint anything from the operator
- * with subtask index 0 because we assume that the state is the same on all
- * parallel instances of the operator.
- *
- * <p>2. Use map to support namespace.
- */
- private abstract class AbstractBroadcastState<T> {
-
- private String name;
- private final StateNamespace namespace;
- private final ListStateDescriptor<Map<String, T>> flinkStateDescriptor;
- private final DefaultOperatorStateBackend flinkStateBackend;
-
- AbstractBroadcastState(
- DefaultOperatorStateBackend flinkStateBackend,
- String name,
- StateNamespace namespace,
- Coder<T> coder) {
- this.name = name;
-
- this.namespace = namespace;
- this.flinkStateBackend = flinkStateBackend;
-
- CoderTypeInformation<Map<String, T>> typeInfo =
- new CoderTypeInformation<>(MapCoder.of(StringUtf8Coder.of(), coder));
-
- flinkStateDescriptor = new ListStateDescriptor<>(name,
- typeInfo.createSerializer(new ExecutionConfig()));
- }
-
- /**
- * Get map(namespce->T) from index 0.
- */
- Map<String, T> getMap() throws Exception {
- if (indexInSubtaskGroup == 0) {
- return getMapFromBroadcastState();
- } else {
- Map<String, T> result = (Map<String, T>) stateForNonZeroOperator.get(name);
- // maybe restore from BroadcastState of Operator-0
- if (result == null) {
- result = getMapFromBroadcastState();
- if (result != null) {
- stateForNonZeroOperator.put(name, result);
- // we don't need it anymore, must clear it.
- flinkStateBackend.getBroadcastOperatorState(
- flinkStateDescriptor).clear();
- }
- }
- return result;
- }
- }
-
- Map<String, T> getMapFromBroadcastState() throws Exception {
- ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState(
- flinkStateDescriptor);
- Iterable<Map<String, T>> iterable = state.get();
- Map<String, T> ret = null;
- if (iterable != null) {
- // just use index 0
- Iterator<Map<String, T>> iterator = iterable.iterator();
- if (iterator.hasNext()) {
- ret = iterator.next();
- }
- }
- return ret;
- }
-
- /**
- * Update map(namespce->T) from index 0.
- */
- void updateMap(Map<String, T> map) throws Exception {
- if (indexInSubtaskGroup == 0) {
- ListState<Map<String, T>> state = flinkStateBackend.getBroadcastOperatorState(
- flinkStateDescriptor);
- state.clear();
- if (map.size() > 0) {
- state.add(map);
- }
- } else {
- if (map.size() == 0) {
- stateForNonZeroOperator.remove(name);
- // updateMap is always behind getMap,
- // getMap will clear map in BroadcastOperatorState,
- // we don't need clear here.
- } else {
- stateForNonZeroOperator.put(name, map);
- }
- }
- }
-
- void writeInternal(T input) {
- try {
- Map<String, T> map = getMap();
- if (map == null) {
- map = new HashMap<>();
- }
- map.put(namespace.stringKey(), input);
- updateMap(map);
- } catch (Exception e) {
- throw new RuntimeException("Error updating state.", e);
- }
- }
-
- T readInternal() {
- try {
- Map<String, T> map = getMap();
- if (map == null) {
- return null;
- } else {
- return map.get(namespace.stringKey());
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- void clearInternal() {
- try {
- Map<String, T> map = getMap();
- if (map != null) {
- map.remove(namespace.stringKey());
- updateMap(map);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error clearing state.", e);
- }
- }
-
- }
-
- private class FlinkBroadcastValueState<K, T>
- extends AbstractBroadcastState<T> implements ValueState<T> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, ValueState<T>> address;
-
- FlinkBroadcastValueState(
- DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, ValueState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
- super(flinkStateBackend, address.getId(), namespace, coder);
-
- this.namespace = namespace;
- this.address = address;
-
- }
-
- @Override
- public void write(T input) {
- writeInternal(input);
- }
-
- @Override
- public ValueState<T> readLater() {
- return this;
- }
-
- @Override
- public T read() {
- return readInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkBroadcastValueState<?, ?> that = (FlinkBroadcastValueState<?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
- }
-
- private class FlinkBroadcastBagState<K, T> extends AbstractBroadcastState<List<T>>
- implements BagState<T> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, BagState<T>> address;
-
- FlinkBroadcastBagState(
- DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, BagState<T>> address,
- StateNamespace namespace,
- Coder<T> coder) {
- super(flinkStateBackend, address.getId(), namespace, ListCoder.of(coder));
-
- this.namespace = namespace;
- this.address = address;
- }
-
- @Override
- public void add(T input) {
- List<T> list = readInternal();
- if (list == null) {
- list = new ArrayList<>();
- }
- list.add(input);
- writeInternal(list);
- }
-
- @Override
- public BagState<T> readLater() {
- return this;
- }
-
- @Override
- public Iterable<T> read() {
- List<T> result = readInternal();
- return result != null ? result : Collections.<T>emptyList();
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- List<T> result = readInternal();
- return result == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkBroadcastBagState<?, ?> that = (FlinkBroadcastBagState<?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private class FlinkCombiningState<K, InputT, AccumT, OutputT>
- extends AbstractBroadcastState<AccumT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-
- FlinkCombiningState(
- DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- AccumT current = readInternal();
- if (current == null) {
- current = combineFn.createAccumulator();
- }
- current = combineFn.addInput(current, value);
- writeInternal(current);
- }
-
- @Override
- public void addAccum(AccumT accum) {
- AccumT current = readInternal();
-
- if (current == null) {
- writeInternal(accum);
- } else {
- current = combineFn.mergeAccumulators(Arrays.asList(current, accum));
- writeInternal(current);
- }
- }
-
- @Override
- public AccumT getAccum() {
- return readInternal();
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(accumulators);
- }
-
- @Override
- public OutputT read() {
- AccumT accum = readInternal();
- if (accum != null) {
- return combineFn.extractOutput(accum);
- } else {
- return combineFn.extractOutput(combineFn.createAccumulator());
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return readInternal() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkCombiningState<?, ?, ?, ?> that =
- (FlinkCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
- extends AbstractBroadcastState<AccumT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn;
- private final FlinkBroadcastStateInternals<K> flinkStateInternals;
-
- FlinkKeyedCombiningState(
- DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkBroadcastStateInternals<K> flinkStateInternals) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateInternals = flinkStateInternals;
-
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- AccumT current = readInternal();
- if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey());
- }
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value);
- writeInternal(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
- AccumT current = readInternal();
- if (current == null) {
- writeInternal(accum);
- } else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Arrays.asList(current, accum));
- writeInternal(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return readInternal();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators);
- }
-
- @Override
- public OutputT read() {
- try {
- AccumT accum = readInternal();
- if (accum != null) {
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
- } else {
- return combineFn.extractOutput(
- flinkStateInternals.getKey(),
- combineFn.createAccumulator(flinkStateInternals.getKey()));
- }
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return readInternal() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkKeyedCombiningState<?, ?, ?, ?> that =
- (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
- private class FlinkCombiningStateWithContext<K, InputT, AccumT, OutputT>
- extends AbstractBroadcastState<AccumT>
- implements CombiningState<InputT, AccumT, OutputT> {
-
- private final StateNamespace namespace;
- private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address;
- private final CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn;
- private final FlinkBroadcastStateInternals<K> flinkStateInternals;
- private final CombineWithContext.Context context;
-
- FlinkCombiningStateWithContext(
- DefaultOperatorStateBackend flinkStateBackend,
- StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
- CombineWithContext.KeyedCombineFnWithContext<
- ? super K, InputT, AccumT, OutputT> combineFn,
- StateNamespace namespace,
- Coder<AccumT> accumCoder,
- FlinkBroadcastStateInternals<K> flinkStateInternals,
- CombineWithContext.Context context) {
- super(flinkStateBackend, address.getId(), namespace, accumCoder);
-
- this.namespace = namespace;
- this.address = address;
- this.combineFn = combineFn;
- this.flinkStateInternals = flinkStateInternals;
- this.context = context;
-
- }
-
- @Override
- public CombiningState<InputT, AccumT, OutputT> readLater() {
- return this;
- }
-
- @Override
- public void add(InputT value) {
- try {
- AccumT current = readInternal();
- if (current == null) {
- current = combineFn.createAccumulator(flinkStateInternals.getKey(), context);
- }
- current = combineFn.addInput(flinkStateInternals.getKey(), current, value, context);
- writeInternal(current);
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state." , e);
- }
- }
-
- @Override
- public void addAccum(AccumT accum) {
- try {
-
- AccumT current = readInternal();
- if (current == null) {
- writeInternal(accum);
- } else {
- current = combineFn.mergeAccumulators(
- flinkStateInternals.getKey(),
- Arrays.asList(current, accum),
- context);
- writeInternal(current);
- }
- } catch (Exception e) {
- throw new RuntimeException("Error adding to state.", e);
- }
- }
-
- @Override
- public AccumT getAccum() {
- try {
- return readInternal();
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
- return combineFn.mergeAccumulators(flinkStateInternals.getKey(), accumulators, context);
- }
-
- @Override
- public OutputT read() {
- try {
- AccumT accum = readInternal();
- return combineFn.extractOutput(flinkStateInternals.getKey(), accum, context);
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
- }
-
- @Override
- public ReadableState<Boolean> isEmpty() {
- return new ReadableState<Boolean>() {
- @Override
- public Boolean read() {
- try {
- return readInternal() == null;
- } catch (Exception e) {
- throw new RuntimeException("Error reading state.", e);
- }
-
- }
-
- @Override
- public ReadableState<Boolean> readLater() {
- return this;
- }
- };
- }
-
- @Override
- public void clear() {
- clearInternal();
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- FlinkCombiningStateWithContext<?, ?, ?, ?> that =
- (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
-
- return namespace.equals(that.namespace) && address.equals(that.address);
-
- }
-
- @Override
- public int hashCode() {
- int result = namespace.hashCode();
- result = 31 * result + address.hashCode();
- return result;
- }
- }
-
-}