You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 23:03:47 UTC
[40/50] [abbrv] beam git commit: [BEAM-843] Use New DoFn Directly in
Flink Runner
[BEAM-843] Use New DoFn Directly in Flink Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb
Branch: refs/heads/master
Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278
Parents: 27cf68e
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jan 18 11:34:06 2017 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:38:38 2017 -0800
----------------------------------------------------------------------
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 156 +++++++++++++++++++
.../wrappers/streaming/DoFnOperator.java | 69 ++++----
.../wrappers/streaming/WindowDoFnOperator.java | 143 +++++++++--------
3 files changed, 264 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
new file mode 100644
index 0000000..cff6e00
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetNewDoFn<
+ K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
+ extends DoFn<RinT, KV<K, OutputT>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static <K, InputT, OutputT, W extends BoundedWindow>
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
+ WindowingStrategy<?, W> strategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ TimerInternalsFactory<K> timerInternalsFactory,
+ SideInputReader sideInputReader,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag<KV<K, OutputT>> mainTag) {
+ return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
+ strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+ reduceFn, outputManager, mainTag);
+ }
+
+ protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+ createAggregator(
+ GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+ protected final Aggregator<Long, Long> droppedDueToLateness =
+ createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+ private final WindowingStrategy<Object, W> windowingStrategy;
+ private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+ private transient StateInternalsFactory<K> stateInternalsFactory;
+ private transient TimerInternalsFactory<K> timerInternalsFactory;
+ private transient SideInputReader sideInputReader;
+ private transient DoFnRunners.OutputManager outputManager;
+ private TupleTag<KV<K, OutputT>> mainTag;
+
+ public GroupAlsoByWindowViaWindowSetNewDoFn(
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternalsFactory<K> stateInternalsFactory,
+ TimerInternalsFactory<K> timerInternalsFactory,
+ SideInputReader sideInputReader,
+ SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+ DoFnRunners.OutputManager outputManager,
+ TupleTag<KV<K, OutputT>> mainTag) {
+ this.timerInternalsFactory = timerInternalsFactory;
+ this.sideInputReader = sideInputReader;
+ this.outputManager = outputManager;
+ this.mainTag = mainTag;
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
+ this.windowingStrategy = noWildcard;
+ this.reduceFn = reduceFn;
+ this.stateInternalsFactory = stateInternalsFactory;
+ }
+
+ private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
+ return new OutputWindowedValue<KV<K, OutputT>>() {
+ @Override
+ public void outputWindowedValue(
+ KV<K, OutputT> output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(mainTag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputManager.output(tag,
+ WindowedValue.of(output, timestamp, windows, pane));
+ }
+ };
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
+
+ K key = keyedWorkItem.key();
+ StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+ TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
+
+ ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+ new ReduceFnRunner<>(
+ key,
+ windowingStrategy,
+ ExecutableTriggerStateMachine.create(
+ TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+ stateInternals,
+ timerInternals,
+ outputWindowedValue(),
+ sideInputReader,
+ droppedDueToClosedWindow,
+ reduceFn,
+ c.getPipelineOptions());
+
+ reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+ reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
+ reduceFnRunner.persist();
+ }
+
+ public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+ throw new RuntimeException("Not implement!");
+ }
+
+ public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
+ return droppedDueToLateness;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index ac85b3c..de0264a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -28,12 +28,11 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -45,6 +44,8 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.NullSideInputReader;
@@ -78,10 +79,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
/**
- * Flink operator for executing {@link OldDoFn DoFns}.
+ * Flink operator for executing {@link DoFn DoFns}.
*
- * @param <InputT> the input type of the {@link OldDoFn}
- * @param <FnOutputT> the output type of the {@link OldDoFn}
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <FnOutputT> the output type of the {@link DoFn}
* @param <OutputT> the output type of the operator, this can be different from the fn output
* type when we have side outputs
*/
@@ -90,7 +91,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
- protected OldDoFn<InputT, FnOutputT> oldDoFn;
+ protected DoFn<InputT, FnOutputT> doFn;
protected final SerializedPipelineOptions serializedOptions;
@@ -108,6 +109,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient SideInputHandler sideInputHandler;
+ protected transient SideInputReader sideInputReader;
+
+ protected transient DoFnRunners.OutputManager outputManager;
+
+ private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+
protected transient long currentInputWatermark;
protected transient long currentOutputWatermark;
@@ -120,9 +127,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
- @Deprecated
public DoFnOperator(
- OldDoFn<InputT, FnOutputT> oldDoFn,
+ DoFn<InputT, FnOutputT> doFn,
TypeInformation<WindowedValue<InputT>> inputType,
TupleTag<FnOutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
@@ -131,7 +137,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
Map<Integer, PCollectionView<?>> sideInputTagMapping,
Collection<PCollectionView<?>> sideInputs,
PipelineOptions options) {
- this.oldDoFn = oldDoFn;
+ this.doFn = doFn;
this.mainOutputTag = mainOutputTag;
this.sideOutputTags = sideOutputTags;
this.sideInputTagMapping = sideInputTagMapping;
@@ -152,44 +158,20 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
setChainingStrategy(ChainingStrategy.ALWAYS);
}
- public DoFnOperator(
- DoFn<InputT, FnOutputT> doFn,
- TypeInformation<WindowedValue<InputT>> inputType,
- TupleTag<FnOutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- OutputManagerFactory<OutputT> outputManagerFactory,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<Integer, PCollectionView<?>> sideInputTagMapping,
- Collection<PCollectionView<?>> sideInputs,
- PipelineOptions options) {
- this(
- DoFnAdapters.toOldDoFn(doFn),
- inputType,
- mainOutputTag,
- sideOutputTags,
- outputManagerFactory,
- windowingStrategy,
- sideInputTagMapping,
- sideInputs,
- options);
- }
-
protected ExecutionContext.StepContext createStepContext() {
return new StepContext();
}
// allow overriding this in WindowDoFnOperator because this one dynamically creates
// the DoFn
- protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
- return oldDoFn;
+ protected DoFn<InputT, FnOutputT> getDoFn() {
+ return doFn;
}
@Override
public void open() throws Exception {
super.open();
- this.oldDoFn = getOldDoFn();
-
currentInputWatermark = Long.MIN_VALUE;
currentOutputWatermark = currentInputWatermark;
@@ -214,7 +196,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
};
- SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+ sideInputReader = NullSideInputReader.of(sideInputs);
+
if (!sideInputs.isEmpty()) {
String operatorIdentifier =
this.getClass().getSimpleName() + "_"
@@ -244,11 +227,18 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
sideInputReader = sideInputHandler;
}
+ outputManager = outputManagerFactory.create(output);
+
+ this.doFn = getDoFn();
+ doFnInvoker = DoFnInvokers.invokerFor(doFn);
+
+ doFnInvoker.invokeSetup();
+
DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(),
- oldDoFn,
+ doFn,
sideInputReader,
- outputManagerFactory.create(output),
+ outputManager,
mainOutputTag,
sideOutputTags,
createStepContext(),
@@ -258,13 +248,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
pushbackDoFnRunner =
PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
- oldDoFn.setup();
}
@Override
public void close() throws Exception {
super.close();
- oldDoFn.teardown();
+ doFnInvoker.invokeTeardown();
}
protected final long getPushbackWatermarkHold() {
http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index d4273b2..74614ad 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,11 +38,11 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.Nullable;
+
import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OldDoFn;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -91,6 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
private transient FlinkStateInternals<K> stateInternals;
+ private transient FlinkTimerInternals timerInternals;
private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
@@ -106,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
PipelineOptions options,
Coder<K> keyCoder) {
super(
- (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null,
+ null,
inputType,
mainOutputTag,
sideOutputTags,
@@ -124,7 +126,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
}
@Override
- protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
+ protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
@Override
public StateInternals<K> stateInternalsForKey(K key) {
@@ -133,15 +135,23 @@ public class WindowDoFnOperator<K, InputT, OutputT>
return stateInternals;
}
};
+ TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
+ @Override
+ public TimerInternals timerInternalsForKey(K key) {
+ //this will implicitly be keyed like the StateInternalsFactory
+ return timerInternals;
+ }
+ };
// we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
// has the window type as generic parameter while WindowingStrategy is almost always
// untyped.
@SuppressWarnings("unchecked")
- OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn =
- GroupAlsoByWindowViaWindowSetDoFn.create(
- windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn);
- return oldDoFn;
+ DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+ GroupAlsoByWindowViaWindowSetNewDoFn.create(
+ windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+ (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
+ return doFn;
}
@@ -183,6 +193,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
processingTimeTimerFutures = new HashMap<>();
stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
+ timerInternals = new FlinkTimerInternals();
// call super at the end because this will call getDoFn() which requires stateInternals
// to be set
@@ -448,75 +459,79 @@ public class WindowDoFnOperator<K, InputT, OutputT>
@Override
public TimerInternals timerInternals() {
- return new TimerInternals() {
- @Override
- public void setTimer(
+ return timerInternals;
+ }
+ }
+
+ private class FlinkTimerInternals implements TimerInternals {
+
+ @Override
+ public void setTimer(
StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
- throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
- }
+ throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ }
- @Deprecated
- @Override
- public void setTimer(TimerData timerKey) {
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- registerEventTimeTimer(timerKey);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- registerProcessingTimeTimer(timerKey);
- } else {
- throw new UnsupportedOperationException(
+ @Deprecated
+ @Override
+ public void setTimer(TimerData timerKey) {
+ if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+ registerEventTimeTimer(timerKey);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ registerProcessingTimeTimer(timerKey);
+ } else {
+ throw new UnsupportedOperationException(
"Unsupported time domain: " + timerKey.getDomain());
- }
- }
+ }
+ }
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
- throw new UnsupportedOperationException(
+ @Deprecated
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
"Canceling of a timer by ID is not yet supported.");
- }
+ }
- @Deprecated
- @Override
- public void deleteTimer(StateNamespace namespace, String timerId) {
- throw new UnsupportedOperationException(
- "Canceling of a timer by ID is not yet supported.");
- }
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer by ID is not yet supported.");
+ }
- @Deprecated
- @Override
- public void deleteTimer(TimerData timerKey) {
- if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
- deleteEventTimeTimer(timerKey);
- } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
- deleteProcessingTimeTimer(timerKey);
- } else {
- throw new UnsupportedOperationException(
+ @Deprecated
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+ deleteEventTimeTimer(timerKey);
+ } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+ deleteProcessingTimeTimer(timerKey);
+ } else {
+ throw new UnsupportedOperationException(
"Unsupported time domain: " + timerKey.getDomain());
- }
- }
+ }
+ }
- @Override
- public Instant currentProcessingTime() {
- return new Instant(getCurrentProcessingTime());
- }
+ @Override
+ public Instant currentProcessingTime() {
+ return new Instant(getCurrentProcessingTime());
+ }
- @Nullable
- @Override
- public Instant currentSynchronizedProcessingTime() {
- return new Instant(getCurrentProcessingTime());
- }
+ @Nullable
+ @Override
+ public Instant currentSynchronizedProcessingTime() {
+ return new Instant(getCurrentProcessingTime());
+ }
- @Override
- public Instant currentInputWatermarkTime() {
- return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
- }
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
+ }
- @Nullable
- @Override
- public Instant currentOutputWatermarkTime() {
- return new Instant(currentOutputWatermark);
- }
- };
+ @Nullable
+ @Override
+ public Instant currentOutputWatermarkTime() {
+ return new Instant(currentOutputWatermark);
}
+
}
}