You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/01/30 23:03:47 UTC

[40/50] [abbrv] beam git commit: [BEAM-843] Use New DoFn Directly in Flink Runner

[BEAM-843] Use New DoFn Directly in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4aaaf8fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4aaaf8fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4aaaf8fb

Branch: refs/heads/master
Commit: 4aaaf8fb94222e6e283fa79d9c7dc8d9a730d278
Parents: 27cf68e
Author: JingsongLi <lz...@aliyun.com>
Authored: Wed Jan 18 11:34:06 2017 +0800
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jan 30 12:38:38 2017 -0800

----------------------------------------------------------------------
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   | 156 +++++++++++++++++++
 .../wrappers/streaming/DoFnOperator.java        |  69 ++++----
 .../wrappers/streaming/WindowDoFnOperator.java  | 143 +++++++++--------
 3 files changed, 264 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
new file mode 100644
index 0000000..cff6e00
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Collection;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.SystemDoFnInternal;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A general {@link GroupAlsoByWindowsDoFn}. This delegates all of the logic to the
+ * {@link ReduceFnRunner}.
+ */
+@SystemDoFnInternal
+public class GroupAlsoByWindowViaWindowSetNewDoFn<
+        K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem<K, InputT>>
+    extends DoFn<RinT, KV<K, OutputT>> {
+
+  private static final long serialVersionUID = 1L;
+
+  public static <K, InputT, OutputT, W extends BoundedWindow>
+      DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> create(
+          WindowingStrategy<?, W> strategy,
+          StateInternalsFactory<K> stateInternalsFactory,
+          TimerInternalsFactory<K> timerInternalsFactory,
+          SideInputReader sideInputReader,
+          SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+          DoFnRunners.OutputManager outputManager,
+          TupleTag<KV<K, OutputT>> mainTag) {
+    return new GroupAlsoByWindowViaWindowSetNewDoFn<>(
+            strategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+            reduceFn, outputManager, mainTag);
+  }
+
+  protected final Aggregator<Long, Long> droppedDueToClosedWindow =
+      createAggregator(
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, Sum.ofLongs());
+  protected final Aggregator<Long, Long> droppedDueToLateness =
+      createAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, Sum.ofLongs());
+  private final WindowingStrategy<Object, W> windowingStrategy;
+  private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn;
+  private transient StateInternalsFactory<K> stateInternalsFactory;
+  private transient TimerInternalsFactory<K> timerInternalsFactory;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnRunners.OutputManager outputManager;
+  private TupleTag<KV<K, OutputT>> mainTag;
+
+  public GroupAlsoByWindowViaWindowSetNewDoFn(
+          WindowingStrategy<?, W> windowingStrategy,
+          StateInternalsFactory<K> stateInternalsFactory,
+          TimerInternalsFactory<K> timerInternalsFactory,
+          SideInputReader sideInputReader,
+          SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn,
+          DoFnRunners.OutputManager outputManager,
+          TupleTag<KV<K, OutputT>> mainTag) {
+    this.timerInternalsFactory = timerInternalsFactory;
+    this.sideInputReader = sideInputReader;
+    this.outputManager = outputManager;
+    this.mainTag = mainTag;
+    @SuppressWarnings("unchecked")
+    WindowingStrategy<Object, W> noWildcard = (WindowingStrategy<Object, W>) windowingStrategy;
+    this.windowingStrategy = noWildcard;
+    this.reduceFn = reduceFn;
+    this.stateInternalsFactory = stateInternalsFactory;
+  }
+
+  private OutputWindowedValue<KV<K, OutputT>> outputWindowedValue() {
+    return new OutputWindowedValue<KV<K, OutputT>>() {
+      @Override
+      public void outputWindowedValue(
+              KV<K, OutputT> output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+        outputManager.output(mainTag,
+                WindowedValue.of(output, timestamp, windows, pane));
+      }
+
+      @Override
+      public <SideOutputT> void sideOutputWindowedValue(
+              TupleTag<SideOutputT> tag,
+              SideOutputT output,
+              Instant timestamp,
+              Collection<? extends BoundedWindow> windows,
+              PaneInfo pane) {
+        outputManager.output(tag,
+                WindowedValue.of(output, timestamp, windows, pane));
+      }
+    };
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext c) throws Exception {
+    KeyedWorkItem<K, InputT> keyedWorkItem = c.element();
+
+    K key = keyedWorkItem.key();
+    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    TimerInternals timerInternals = timerInternalsFactory.timerInternalsForKey(key);
+
+    ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            windowingStrategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+            stateInternals,
+            timerInternals,
+            outputWindowedValue(),
+            sideInputReader,
+            droppedDueToClosedWindow,
+            reduceFn,
+            c.getPipelineOptions());
+
+    reduceFnRunner.processElements(keyedWorkItem.elementsIterable());
+    reduceFnRunner.onTimers(keyedWorkItem.timersIterable());
+    reduceFnRunner.persist();
+  }
+
+  public OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn() {
+    throw new RuntimeException("Not implement!");
+  }
+
+  public Aggregator<Long, Long> getDroppedDueToLatenessAggregator() {
+    return droppedDueToLateness;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index ac85b3c..de0264a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -28,12 +28,11 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.beam.runners.core.AggregatorFactory;
-import org.apache.beam.runners.core.DoFnAdapters;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
@@ -45,6 +44,8 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NullSideInputReader;
@@ -78,10 +79,10 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 /**
- * Flink operator for executing {@link OldDoFn DoFns}.
+ * Flink operator for executing {@link DoFn DoFns}.
  *
- * @param <InputT> the input type of the {@link OldDoFn}
- * @param <FnOutputT> the output type of the {@link OldDoFn}
+ * @param <InputT> the input type of the {@link DoFn}
+ * @param <FnOutputT> the output type of the {@link DoFn}
  * @param <OutputT> the output type of the operator, this can be different from the fn output
  *                 type when we have side outputs
  */
@@ -90,7 +91,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
       TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
 
-  protected OldDoFn<InputT, FnOutputT> oldDoFn;
+  protected DoFn<InputT, FnOutputT> doFn;
 
   protected final SerializedPipelineOptions serializedOptions;
 
@@ -108,6 +109,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient SideInputHandler sideInputHandler;
 
+  protected transient SideInputReader sideInputReader;
+
+  protected transient DoFnRunners.OutputManager outputManager;
+
+  private transient DoFnInvoker<InputT, FnOutputT> doFnInvoker;
+
   protected transient long currentInputWatermark;
 
   protected transient long currentOutputWatermark;
@@ -120,9 +127,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> restoredSideInputState;
 
-  @Deprecated
   public DoFnOperator(
-      OldDoFn<InputT, FnOutputT> oldDoFn,
+      DoFn<InputT, FnOutputT> doFn,
       TypeInformation<WindowedValue<InputT>> inputType,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
@@ -131,7 +137,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       Map<Integer, PCollectionView<?>> sideInputTagMapping,
       Collection<PCollectionView<?>> sideInputs,
       PipelineOptions options) {
-    this.oldDoFn = oldDoFn;
+    this.doFn = doFn;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
@@ -152,44 +158,20 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     setChainingStrategy(ChainingStrategy.ALWAYS);
   }
 
-  public DoFnOperator(
-      DoFn<InputT, FnOutputT> doFn,
-      TypeInformation<WindowedValue<InputT>> inputType,
-      TupleTag<FnOutputT> mainOutputTag,
-      List<TupleTag<?>> sideOutputTags,
-      OutputManagerFactory<OutputT> outputManagerFactory,
-      WindowingStrategy<?, ?> windowingStrategy,
-      Map<Integer, PCollectionView<?>> sideInputTagMapping,
-      Collection<PCollectionView<?>> sideInputs,
-      PipelineOptions options) {
-    this(
-        DoFnAdapters.toOldDoFn(doFn),
-        inputType,
-        mainOutputTag,
-        sideOutputTags,
-        outputManagerFactory,
-        windowingStrategy,
-        sideInputTagMapping,
-        sideInputs,
-        options);
-  }
-
   protected ExecutionContext.StepContext createStepContext() {
     return new StepContext();
   }
 
   // allow overriding this in WindowDoFnOperator because this one dynamically creates
   // the DoFn
-  protected OldDoFn<InputT, FnOutputT> getOldDoFn() {
-    return oldDoFn;
+  protected DoFn<InputT, FnOutputT> getDoFn() {
+    return doFn;
   }
 
   @Override
   public void open() throws Exception {
     super.open();
 
-    this.oldDoFn = getOldDoFn();
-
     currentInputWatermark = Long.MIN_VALUE;
     currentOutputWatermark = currentInputWatermark;
 
@@ -214,7 +196,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
     };
 
-    SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+    sideInputReader = NullSideInputReader.of(sideInputs);
+
     if (!sideInputs.isEmpty()) {
       String operatorIdentifier =
           this.getClass().getSimpleName() + "_"
@@ -244,11 +227,18 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       sideInputReader = sideInputHandler;
     }
 
+    outputManager = outputManagerFactory.create(output);
+
+    this.doFn = getDoFn();
+    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+
+    doFnInvoker.invokeSetup();
+
     DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(),
-        oldDoFn,
+        doFn,
         sideInputReader,
-        outputManagerFactory.create(output),
+        outputManager,
         mainOutputTag,
         sideOutputTags,
         createStepContext(),
@@ -258,13 +248,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     pushbackDoFnRunner =
         PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler);
 
-    oldDoFn.setup();
   }
 
   @Override
   public void close() throws Exception {
     super.close();
-    oldDoFn.teardown();
+    doFnInvoker.invokeTeardown();
   }
 
   protected final long getPushbackWatermarkHold() {

http://git-wip-us.apache.org/repos/asf/beam/blob/4aaaf8fb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index d4273b2..74614ad 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -38,11 +38,11 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import javax.annotation.Nullable;
+
 import org.apache.beam.runners.core.ExecutionContext;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.TimerInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -91,6 +92,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   private transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
 
   private transient FlinkStateInternals<K> stateInternals;
+  private transient FlinkTimerInternals timerInternals;
 
   private final SystemReduceFn<K, InputT, ?, OutputT, BoundedWindow> systemReduceFn;
 
@@ -106,7 +108,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
       PipelineOptions options,
       Coder<K> keyCoder) {
     super(
-        (OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>>) null,
+        null,
         inputType,
         mainOutputTag,
         sideOutputTags,
@@ -124,7 +126,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
   }
 
   @Override
-  protected OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getOldDoFn() {
+  protected DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> getDoFn() {
     StateInternalsFactory<K> stateInternalsFactory = new StateInternalsFactory<K>() {
       @Override
       public StateInternals<K> stateInternalsForKey(K key) {
@@ -133,15 +135,23 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         return stateInternals;
       }
     };
+    TimerInternalsFactory<K> timerInternalsFactory = new TimerInternalsFactory<K>() {
+      @Override
+      public TimerInternals timerInternalsForKey(K key) {
+        //this will implicitly be keyed like the StateInternalsFactory
+        return timerInternals;
+      }
+    };
 
     // we have to do the unchecked cast because GroupAlsoByWindowViaWindowSetDoFn.create
     // has the window type as generic parameter while WindowingStrategy is almost always
     // untyped.
     @SuppressWarnings("unchecked")
-    OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> oldDoFn =
-        GroupAlsoByWindowViaWindowSetDoFn.create(
-            windowingStrategy, stateInternalsFactory, (SystemReduceFn) systemReduceFn);
-    return oldDoFn;
+    DoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFn =
+        GroupAlsoByWindowViaWindowSetNewDoFn.create(
+            windowingStrategy, stateInternalsFactory, timerInternalsFactory, sideInputReader,
+                (SystemReduceFn) systemReduceFn, outputManager, mainOutputTag);
+    return doFn;
   }
 
 
@@ -183,6 +193,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     processingTimeTimerFutures = new HashMap<>();
 
     stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
+    timerInternals = new FlinkTimerInternals();
 
     // call super at the end because this will call getDoFn() which requires stateInternals
     // to be set
@@ -448,75 +459,79 @@ public class WindowDoFnOperator<K, InputT, OutputT>
 
     @Override
     public TimerInternals timerInternals() {
-      return new TimerInternals() {
-        @Override
-        public void setTimer(
+      return timerInternals;
+    }
+  }
+
+  private class FlinkTimerInternals implements TimerInternals {
+
+    @Override
+    public void setTimer(
             StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
-          throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
-        }
+      throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+    }
 
-        @Deprecated
-        @Override
-        public void setTimer(TimerData timerKey) {
-          if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-            registerEventTimeTimer(timerKey);
-          } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-            registerProcessingTimeTimer(timerKey);
-          } else {
-            throw new UnsupportedOperationException(
+    @Deprecated
+    @Override
+    public void setTimer(TimerData timerKey) {
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        registerEventTimeTimer(timerKey);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        registerProcessingTimeTimer(timerKey);
+      } else {
+        throw new UnsupportedOperationException(
                 "Unsupported time domain: " + timerKey.getDomain());
-          }
-        }
+      }
+    }
 
-        @Override
-        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
-          throw new UnsupportedOperationException(
+    @Deprecated
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+      throw new UnsupportedOperationException(
               "Canceling of a timer by ID is not yet supported.");
-        }
+    }
 
-        @Deprecated
-        @Override
-        public void deleteTimer(StateNamespace namespace, String timerId) {
-          throw new UnsupportedOperationException(
-              "Canceling of a timer by ID is not yet supported.");
-        }
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+      throw new UnsupportedOperationException(
+          "Canceling of a timer by ID is not yet supported.");
+    }
 
-        @Deprecated
-        @Override
-        public void deleteTimer(TimerData timerKey) {
-          if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
-            deleteEventTimeTimer(timerKey);
-          } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
-            deleteProcessingTimeTimer(timerKey);
-          } else {
-            throw new UnsupportedOperationException(
+    @Deprecated
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        deleteEventTimeTimer(timerKey);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        deleteProcessingTimeTimer(timerKey);
+      } else {
+        throw new UnsupportedOperationException(
                 "Unsupported time domain: " + timerKey.getDomain());
-          }
-        }
+      }
+    }
 
-        @Override
-        public Instant currentProcessingTime() {
-          return new Instant(getCurrentProcessingTime());
-        }
+    @Override
+    public Instant currentProcessingTime() {
+      return new Instant(getCurrentProcessingTime());
+    }
 
-        @Nullable
-        @Override
-        public Instant currentSynchronizedProcessingTime() {
-          return new Instant(getCurrentProcessingTime());
-        }
+    @Nullable
+    @Override
+    public Instant currentSynchronizedProcessingTime() {
+      return new Instant(getCurrentProcessingTime());
+    }
 
-        @Override
-        public Instant currentInputWatermarkTime() {
-          return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
-        }
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return new Instant(Math.min(currentInputWatermark, getPushbackWatermarkHold()));
+    }
 
-        @Nullable
-        @Override
-        public Instant currentOutputWatermarkTime() {
-          return new Instant(currentOutputWatermark);
-        }
-      };
+    @Nullable
+    @Override
+    public Instant currentOutputWatermarkTime() {
+      return new Instant(currentOutputWatermark);
     }
+
   }
 
 }