You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2022/02/23 18:28:38 UTC

[beam] branch revert-16748-revert-timer created (now f68daa0)

This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a change to branch revert-16748-revert-timer
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at f68daa0  Revert "[BEAM-11971] Revert "Fix timer consistency in direct runner" (#16748)"

This branch includes the following new commits:

     new f68daa0  Revert "[BEAM-11971] Revert "Fix timer consistency in direct runner" (#16748)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[beam] 01/01: Revert "[BEAM-11971] Revert "Fix timer consistency in direct runner" (#16748)"

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch revert-16748-revert-timer
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f68daa08e24cb58cfcb7b6967598ee073c017949
Author: reuvenlax <re...@google.com>
AuthorDate: Wed Feb 23 10:27:22 2022 -0800

    Revert "[BEAM-11971] Revert "Fix timer consistency in direct runner" (#16748)"
    
    This reverts commit 98e5fc5d3a3229a3185005d7557f0f5c34ef7ec5.
---
 .../apache/beam/runners/core/TimerInternals.java   |  17 +-
 .../beam/runners/direct/DirectTimerInternals.java  |  72 +++---
 .../direct/ExecutorServiceParallelExecutor.java    |   2 +-
 .../beam/runners/direct/QuiescenceDriver.java      |  78 ++++---
 .../direct/StatefulParDoEvaluatorFactory.java      |  83 ++++---
 .../beam/runners/direct/WatermarkManager.java      | 250 +++++++++------------
 .../apache/beam/runners/local/ExecutionDriver.java |   2 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  10 +-
 8 files changed, 232 insertions(+), 282 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
index 965be82..143254a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TimerInternals.java
@@ -262,11 +262,22 @@ public interface TimerInternals {
               .compare(this.getDomain(), that.getDomain())
               .compare(this.getTimerId(), that.getTimerId())
               .compare(this.getTimerFamilyId(), that.getTimerFamilyId());
-      if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
+      int compResult = chain.result();
+      if (compResult == 0 && !this.getNamespace().equals(that.getNamespace())) {
         // Obtaining the stringKey may be expensive; only do so if required
-        chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
+        compResult = this.getNamespace().stringKey().compareTo(that.getNamespace().stringKey());
       }
-      return chain.result();
+      return compResult;
+    }
+
+    public String stringKey() {
+      return getNamespace().stringKey()
+          + "/"
+          + getDomain().toString()
+          + "/"
+          + getTimerFamilyId()
+          + ":"
+          + getTimerId();
     }
   }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 5a477bb..23d011f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import java.util.Map;
+import java.util.NavigableSet;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
@@ -24,6 +26,8 @@ import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBu
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
 
@@ -35,6 +39,8 @@ class DirectTimerInternals implements TimerInternals {
   private final Clock processingTimeClock;
   private final TransformWatermarks watermarks;
   private final TimerUpdateBuilder timerUpdateBuilder;
+  private final Map<TimeDomain, NavigableSet<TimerData>> modifiedTimers;
+  private final Map<String, TimerData> modifiedTimerIds;
 
   public static DirectTimerInternals create(
       Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
@@ -46,6 +52,11 @@ class DirectTimerInternals implements TimerInternals {
     this.processingTimeClock = clock;
     this.watermarks = watermarks;
     this.timerUpdateBuilder = timerUpdateBuilder;
+    this.modifiedTimers = Maps.newHashMap();
+    this.modifiedTimers.put(TimeDomain.EVENT_TIME, Sets.newTreeSet());
+    this.modifiedTimers.put(TimeDomain.PROCESSING_TIME, Sets.newTreeSet());
+    this.modifiedTimers.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, Sets.newTreeSet());
+    this.modifiedTimerIds = Maps.newHashMap();
   }
 
   @Override
@@ -56,8 +67,7 @@ class DirectTimerInternals implements TimerInternals {
       Instant target,
       Instant outputTimestamp,
       TimeDomain timeDomain) {
-    timerUpdateBuilder.setTimer(
-        TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
+    setTimer(TimerData.of(timerId, timerFamilyId, namespace, target, outputTimestamp, timeDomain));
   }
 
   /**
@@ -68,6 +78,8 @@ class DirectTimerInternals implements TimerInternals {
   @Override
   public void setTimer(TimerData timerData) {
     timerUpdateBuilder.setTimer(timerData);
+    getModifiedTimersOrdered(timerData.getDomain()).add(timerData);
+    modifiedTimerIds.put(timerData.stringKey(), timerData);
   }
 
   @Override
@@ -93,27 +105,25 @@ class DirectTimerInternals implements TimerInternals {
   /** @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}. */
   @Deprecated
   @Override
-  public void deleteTimer(TimerData timerKey) {
-    timerUpdateBuilder.deletedTimer(timerKey);
+  public void deleteTimer(TimerData timerData) {
+    timerUpdateBuilder.deletedTimer(timerData);
+    modifiedTimerIds.put(timerData.stringKey(), timerData);
   }
 
   public TimerUpdate getTimerUpdate() {
     return timerUpdateBuilder.build();
   }
 
-  public boolean containsUpdateForTimeBefore(
-      Instant maxWatermarkTime, Instant maxProcessingTime, Instant maxSynchronizedProcessingTime) {
-    TimerUpdate update = timerUpdateBuilder.build();
-    return hasTimeBefore(
-            update.getSetTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime)
-        || hasTimeBefore(
-            update.getDeletedTimers(),
-            maxWatermarkTime,
-            maxProcessingTime,
-            maxSynchronizedProcessingTime);
+  public NavigableSet<TimerData> getModifiedTimersOrdered(TimeDomain timeDomain) {
+    NavigableSet<TimerData> modified = modifiedTimers.get(timeDomain);
+    if (modified == null) {
+      throw new IllegalStateException("Unexpected time domain " + timeDomain);
+    }
+    return modified;
+  }
+
+  public Map<String, TimerData> getModifiedTimerIds() {
+    return modifiedTimerIds;
   }
 
   @Override
@@ -135,32 +145,4 @@ class DirectTimerInternals implements TimerInternals {
   public @Nullable Instant currentOutputWatermarkTime() {
     return watermarks.getOutputWatermark();
   }
-
-  private boolean hasTimeBefore(
-      Iterable<? extends TimerData> timers,
-      Instant maxWatermarkTime,
-      Instant maxProcessingTime,
-      Instant maxSynchronizedProcessingTime) {
-    for (TimerData timerData : timers) {
-      Instant currentTime;
-      switch (timerData.getDomain()) {
-        case EVENT_TIME:
-          currentTime = maxWatermarkTime;
-          break;
-        case PROCESSING_TIME:
-          currentTime = maxProcessingTime;
-          break;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          currentTime = maxSynchronizedProcessingTime;
-          break;
-        default:
-          throw new RuntimeException("Unexpected timeDomain " + timerData.getDomain());
-      }
-      if (timerData.getTimestamp().isBefore(currentTime)
-          || timerData.getTimestamp().isEqual(currentTime)) {
-        return true;
-      }
-    }
-    return false;
-  }
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 7ffd261..44f2974 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -173,7 +173,7 @@ final class ExecutorServiceParallelExecutor
           @Override
           public void run() {
             DriverState drive = executionDriver.drive();
-            if (drive.isTermainal()) {
+            if (drive.isTerminal()) {
               State newPipelineState = State.UNKNOWN;
               switch (drive) {
                 case FAILED:
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
index 3499967..b5e5736 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/QuiescenceDriver.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -39,6 +38,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,7 +80,7 @@ class QuiescenceDriver implements ExecutionDriver {
   // watermark of a PTransform before enqueuing the resulting bundle to pendingUpdates of downstream
   // PTransform, which can lead to watermark being updated past the emitted elements.
   private final Map<AppliedPTransform<?, ?, ?>, Collection<CommittedBundle<?>>> inflightBundles =
-      new ConcurrentHashMap<>();
+      Maps.newHashMap();
 
   private final AtomicReference<ExecutorState> state =
       new AtomicReference<>(ExecutorState.QUIESCENT);
@@ -164,15 +164,17 @@ class QuiescenceDriver implements ExecutionDriver {
 
   private void processBundle(
       CommittedBundle<?> bundle, AppliedPTransform<?, ?, ?> consumer, CompletionCallback callback) {
-    inflightBundles.compute(
-        consumer,
-        (k, v) -> {
-          if (v == null) {
-            v = new ArrayList<>();
-          }
-          v.add(bundle);
-          return v;
-        });
+    synchronized (inflightBundles) {
+      inflightBundles.compute(
+          consumer,
+          (k, v) -> {
+            if (v == null) {
+              v = new ArrayList<>();
+            }
+            v.add(bundle);
+            return v;
+          });
+    }
     outstandingWork.incrementAndGet();
     bundleProcessor.process(bundle, consumer, callback);
   }
@@ -180,24 +182,28 @@ class QuiescenceDriver implements ExecutionDriver {
   /** Fires any available timers. */
   private void fireTimers() {
     try {
-      for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
-          evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
-        Collection<TimerData> delivery = transformTimers.getTimers();
-        KeyedWorkItem<?, Object> work =
-            KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
-        @SuppressWarnings({"unchecked", "rawtypes"})
-        CommittedBundle<?> bundle =
-            evaluationContext
-                .createKeyedBundle(
-                    transformTimers.getKey(),
-                    (PCollection)
-                        Iterables.getOnlyElement(
-                            transformTimers.getExecutable().getMainInputs().values()))
-                .add(WindowedValue.valueInGlobalWindow(work))
-                .commit(evaluationContext.now());
-        processBundle(
-            bundle, transformTimers.getExecutable(), new TimerIterableCompletionCallback(delivery));
-        state.set(ExecutorState.ACTIVE);
+      synchronized (inflightBundles) {
+        for (FiredTimers<AppliedPTransform<?, ?, ?>> transformTimers :
+            evaluationContext.extractFiredTimers(inflightBundles.keySet())) {
+          Collection<TimerData> delivery = transformTimers.getTimers();
+          KeyedWorkItem<?, Object> work =
+              KeyedWorkItems.timersWorkItem(transformTimers.getKey().getKey(), delivery);
+          @SuppressWarnings({"unchecked", "rawtypes"})
+          CommittedBundle<?> bundle =
+              evaluationContext
+                  .createKeyedBundle(
+                      transformTimers.getKey(),
+                      (PCollection)
+                          Iterables.getOnlyElement(
+                              transformTimers.getExecutable().getMainInputs().values()))
+                  .add(WindowedValue.valueInGlobalWindow(work))
+                  .commit(evaluationContext.now());
+          processBundle(
+              bundle,
+              transformTimers.getExecutable(),
+              new TimerIterableCompletionCallback(delivery));
+          state.set(ExecutorState.ACTIVE);
+        }
       }
     } catch (Exception e) {
       LOG.error("Internal Error while delivering timers", e);
@@ -313,12 +319,14 @@ class QuiescenceDriver implements ExecutionDriver {
         state.set(ExecutorState.ACTIVE);
       }
       outstandingWork.decrementAndGet();
-      inflightBundles.compute(
-          result.getTransform(),
-          (k, v) -> {
-            v.remove(inputBundle);
-            return v.isEmpty() ? null : v;
-          });
+      synchronized (inflightBundles) {
+        inflightBundles.compute(
+            result.getTransform(),
+            (k, v) -> {
+              v.remove(inputBundle);
+              return v.isEmpty() ? null : v;
+            });
+      }
       return committedResult;
     }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index ebd305f..054b22d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -17,14 +17,10 @@
  */
 package org.apache.beam.runners.direct;
 
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
 import com.google.auto.value.AutoValue;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.NavigableSet;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
@@ -47,7 +43,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
 import org.joda.time.Instant;
 
 /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */
@@ -155,7 +150,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       implements TransformEvaluator<KeyedWorkItem<K, KV<K, InputT>>> {
 
     private final DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator;
-    private final List<TimerData> pushedBackTimers = new ArrayList<>();
     private final DirectTimerInternals timerInternals;
 
     DirectStepContext stepContext;
@@ -174,45 +168,46 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
-      PriorityQueue<TimerData> toBeFiredTimers =
-          new PriorityQueue<>(Comparator.comparing(TimerData::getTimestamp));
 
-      Instant maxWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-      Instant maxSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
       for (TimerData timerData : gbkResult.getValue().timersIterable()) {
-        toBeFiredTimers.add(timerData);
-        switch (timerData.getDomain()) {
-          case EVENT_TIME:
-            maxWatermarkTime = Ordering.natural().max(maxWatermarkTime, timerData.getTimestamp());
-            break;
-          case PROCESSING_TIME:
-            maxProcessingTime = Ordering.natural().max(maxProcessingTime, timerData.getTimestamp());
-            break;
-          case SYNCHRONIZED_PROCESSING_TIME:
-            maxSynchronizedProcessingTime =
-                Ordering.natural().max(maxSynchronizedProcessingTime, timerData.getTimestamp());
+        // Get any new or modified timers that are earlier than the current one. In order to
+        // maintain timer ordering,
+        // we need to fire these timers first.
+        NavigableSet<TimerData> earlierTimers =
+            timerInternals.getModifiedTimersOrdered(timerData.getDomain()).headSet(timerData, true);
+        while (!earlierTimers.isEmpty()) {
+          TimerData insertedTimer = earlierTimers.pollFirst();
+          if (timerModified(insertedTimer)) {
+            continue;
+          }
+          // Make sure to register this timer as deleted. This could be a timer that was originally
+          // set for the future
+          // and not in the bundle but was reset to an earlier time in this bundle. If we don't
+          // explicity delete the
+          // future timer, then it will still fire.
+          timerInternals.deleteTimer(insertedTimer);
+          processTimer(insertedTimer, gbkResult.getValue().key());
         }
-      }
 
-      while (!timerInternals.containsUpdateForTimeBefore(
-              maxWatermarkTime, maxProcessingTime, maxSynchronizedProcessingTime)
-          && !toBeFiredTimers.isEmpty()) {
-
-        TimerData timer = toBeFiredTimers.poll();
-        checkState(
-            timer.getNamespace() instanceof WindowNamespace,
-            "Expected Timer %s to be in a %s, but got %s",
-            timer,
-            WindowNamespace.class.getSimpleName(),
-            timer.getNamespace().getClass().getName());
-        WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
-        BoundedWindow timerWindow = windowNamespace.getWindow();
-
-        delegateEvaluator.onTimer(timer, gbkResult.getValue().key(), timerWindow);
-        clearWatermarkHold(timer);
+        // As long as the timer hasn't been modified or deleted earlier in the bundle, fire it.
+        if (!timerModified(timerData)) {
+          processTimer(timerData, gbkResult.getValue().key());
+        }
       }
-      pushedBackTimers.addAll(toBeFiredTimers);
+    }
+
+    // Check to see if a timer has been modified inside this bundle.
+    private boolean timerModified(TimerData timerData) {
+      @Nullable
+      TimerData modifiedTimer = timerInternals.getModifiedTimerIds().get(timerData.stringKey());
+      return modifiedTimer != null && !modifiedTimer.equals(timerData);
+    }
+
+    private void processTimer(TimerData timerData, K key) throws Exception {
+      WindowNamespace<?> windowNamespace = (WindowNamespace) timerData.getNamespace();
+      BoundedWindow timerWindow = windowNamespace.getWindow();
+      delegateEvaluator.onTimer(timerData, key, timerWindow);
+      clearWatermarkHold(timerData);
     }
 
     private void clearWatermarkHold(TimerData timer) {
@@ -256,9 +251,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
         watermarkHold = delegateResult.getWatermarkHold();
       }
 
-      TimerUpdate timerUpdate =
-          delegateResult.getTimerUpdate().withPushedBackTimers(pushedBackTimers);
-      pushedBackTimers.clear();
+      TimerUpdate timerUpdate = delegateResult.getTimerUpdate();
       StepTransformResult.Builder<KeyedWorkItem<K, KV<K, InputT>>> regroupedResult =
           StepTransformResult.<KeyedWorkItem<K, KV<K, InputT>>>withHold(
                   delegateResult.getTransform(), watermarkHold)
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index e6626d0..a68021e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -26,17 +26,15 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Objects;
+import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -45,7 +43,6 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
-import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.local.Bundle;
@@ -60,16 +57,14 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Queues;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeMultiset;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Instant;
@@ -239,7 +234,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
 
     // Entries in this table represent the authoritative timestamp for which
     // a per-key-and-StateNamespace timer is set.
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
+    private final Map<StructuralKey<?>, Map<String, TimerData>> existingTimers;
 
     // This per-key sorted set allows quick retrieval of timers that should fire for a key
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
@@ -343,15 +338,14 @@ class WatermarkManager<ExecutableT, CollectionT> {
     synchronized void updateTimers(TimerUpdate update) {
       NavigableSet<TimerData> keyTimers =
           objectTimers.computeIfAbsent(update.key, k -> new TreeSet<>());
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-          existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
+      Map<String, TimerData> existingTimersForKey =
+          existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap());
 
+      HashSet<String> newSetTimers = Sets.newHashSet();
       for (TimerData timer : update.getSetTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(
-                  timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
+          newSetTimers.add(timer.stringKey());
+          @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey());
 
           if (existingTimer == null) {
             pendingTimers.add(timer);
@@ -366,32 +360,29 @@ class WatermarkManager<ExecutableT, CollectionT> {
             keyTimers.add(timer);
           }
 
-          existingTimersForKey.put(
-              timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId(), timer);
+          existingTimersForKey.put(timer.stringKey(), timer);
         }
       }
 
       for (TimerData timer : update.getDeletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          @Nullable
-          TimerData existingTimer =
-              existingTimersForKey.get(
-                  timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
+          @Nullable TimerData existingTimer = existingTimersForKey.get(timer.stringKey());
 
           if (existingTimer != null) {
             pendingTimers.remove(existingTimer);
             keyTimers.remove(existingTimer);
-            existingTimersForKey.remove(
-                existingTimer.getNamespace(),
-                existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId());
+            existingTimersForKey.remove(existingTimer.stringKey());
           }
         }
       }
 
       for (TimerData timer : update.getCompletedTimers()) {
         if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
-          keyTimers.remove(timer);
-          pendingTimers.remove(timer);
+          if (!newSetTimers.contains(timer.stringKey())) {
+            keyTimers.remove(timer);
+            pendingTimers.remove(timer);
+            existingTimersForKey.remove(timer.stringKey());
+          }
         }
       }
 
@@ -519,7 +510,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
     private final Collection<Bundle<?, ?>> pendingBundles;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
     private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
-    private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerData>> existingTimers;
+    private final Map<StructuralKey<?>, Map<String, TimerData>> existingTimers;
 
     private final NavigableSet<TimerData> pendingTimers;
 
@@ -629,21 +620,18 @@ class WatermarkManager<ExecutableT, CollectionT> {
     }
 
     private synchronized void updateTimers(TimerUpdate update) {
-      Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
-      Table<StateNamespace, String, TimerData> existingTimersForKey =
-          existingTimers.computeIfAbsent(update.key, k -> HashBasedTable.create());
+      Map<String, TimerData> existingTimersForKey =
+          existingTimers.computeIfAbsent(update.key, k -> Maps.newHashMap());
 
+      HashSet<String> newSetTimers = Sets.newHashSet();
       for (TimerData addedTimer : update.setTimers.values()) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
+        NavigableSet<TimerData> timerQueue =
+            processQueueForDomain(update.key, addedTimer.getDomain());
         if (timerQueue == null) {
           continue;
         }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(
-                addedTimer.getNamespace(),
-                addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId());
+        newSetTimers.add(addedTimer.stringKey());
+        @Nullable TimerData existingTimer = existingTimersForKey.get(addedTimer.stringKey());
         if (existingTimer == null) {
           timerQueue.add(addedTimer);
         } else if (!existingTimer.equals(addedTimer)) {
@@ -651,34 +639,30 @@ class WatermarkManager<ExecutableT, CollectionT> {
           timerQueue.add(addedTimer);
         } // else the timer is already set identically, so noop.
 
-        existingTimersForKey.put(
-            addedTimer.getNamespace(),
-            addedTimer.getTimerId() + '+' + addedTimer.getTimerFamilyId(),
-            addedTimer);
+        existingTimersForKey.put(addedTimer.stringKey(), addedTimer);
       }
 
       for (TimerData deletedTimer : update.deletedTimers.values()) {
-        NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
+        NavigableSet<TimerData> timerQueue =
+            processQueueForDomain(update.key, deletedTimer.getDomain());
         if (timerQueue == null) {
           continue;
         }
-
-        @Nullable
-        TimerData existingTimer =
-            existingTimersForKey.get(
-                deletedTimer.getNamespace(),
-                deletedTimer.getTimerId() + '+' + deletedTimer.getTimerFamilyId());
+        String timerKey = deletedTimer.stringKey();
+        @Nullable TimerData existingTimer = existingTimersForKey.get(timerKey);
         if (existingTimer != null) {
           pendingTimers.remove(deletedTimer);
           timerQueue.remove(deletedTimer);
-          existingTimersForKey.remove(
-              existingTimer.getNamespace(),
-              existingTimer.getTimerId() + '+' + existingTimer.getTimerFamilyId());
+          existingTimersForKey.remove(timerKey);
         }
       }
 
       for (TimerData completedTimer : update.completedTimers) {
-        pendingTimers.remove(completedTimer);
+        String timerKey = completedTimer.stringKey();
+        if (!newSetTimers.contains(timerKey)) {
+          pendingTimers.remove(completedTimer);
+          existingTimersForKey.remove(timerKey);
+        }
       }
 
       // notify of TimerData update
@@ -713,15 +697,16 @@ class WatermarkManager<ExecutableT, CollectionT> {
       return firedTimers;
     }
 
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
-      NavigableSet<TimerData> processingQueue =
-          processingTimers.computeIfAbsent(key, k -> new TreeSet<>());
-      NavigableSet<TimerData> synchronizedProcessingQueue =
-          synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>());
-      EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
-      result.put(TimeDomain.PROCESSING_TIME, processingQueue);
-      result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
-      return result;
+    private @Nullable NavigableSet<TimerData> processQueueForDomain(
+        StructuralKey<?> key, TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case PROCESSING_TIME:
+          return processingTimers.computeIfAbsent(key, k -> new TreeSet<>());
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return synchronizedProcessingTimers.computeIfAbsent(key, k -> new TreeSet<>());
+        default:
+          return null;
+      }
     }
 
     @Override
@@ -853,7 +838,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
    *
    * <p>The result collection retains ordering of timers (from earliest to latest).
    */
-  private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
+  private static synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
       Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
     Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
     Set<StructuralKey<?>> emptyKeys = new HashSet<>();
@@ -895,8 +880,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
   private final Map<ExecutableT, TransformWatermarks> transformToWatermarks;
 
   /** A queue of pending updates to the state of this {@link WatermarkManager}. */
-  private final ConcurrentLinkedQueue<PendingWatermarkUpdate<ExecutableT, CollectionT>>
-      pendingUpdates;
+  private final Queue<PendingWatermarkUpdate<ExecutableT, CollectionT>> pendingUpdates;
 
   /** A lock used to control concurrency for updating pending values. */
   private final Lock refreshLock;
@@ -914,7 +898,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
    * bundle processor at a time.
    */
   private final Map<ExecutableT, Set<String>> transformsWithAlreadyExtractedTimers =
-      new ConcurrentHashMap<>();
+      Maps.newHashMap();
 
   /**
    * Creates a new {@link WatermarkManager}. All watermarks within the newly created {@link
@@ -942,7 +926,7 @@ class WatermarkManager<ExecutableT, CollectionT> {
     this.graph = graph;
     this.getName = getName;
 
-    this.pendingUpdates = new ConcurrentLinkedQueue<>();
+    this.pendingUpdates = Queues.newArrayDeque();
 
     this.refreshLock = new ReentrantLock();
     this.pendingRefreshes = new HashSet<>();
@@ -1000,18 +984,20 @@ class WatermarkManager<ExecutableT, CollectionT> {
       Map<ExecutableT, Set<String>> transformsWithAlreadyExtractedTimers, ExecutableT executable) {
 
     return update -> {
-      String timerIdWithNs = TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(update);
-      transformsWithAlreadyExtractedTimers.compute(
-          executable,
-          (k, v) -> {
-            if (v != null) {
-              v.remove(timerIdWithNs);
-              if (v.isEmpty()) {
-                v = null;
+      String timerIdWithNs = update.stringKey();
+      synchronized (transformsWithAlreadyExtractedTimers) {
+        transformsWithAlreadyExtractedTimers.compute(
+            executable,
+            (k, v) -> {
+              if (v != null) {
+                v.remove(timerIdWithNs);
+                if (v.isEmpty()) {
+                  v = null;
+                }
               }
-            }
-            return v;
-          });
+              return v;
+            });
+      }
     };
   }
 
@@ -1108,9 +1094,11 @@ class WatermarkManager<ExecutableT, CollectionT> {
       @Nullable Bundle<?, ? extends CollectionT> unprocessedInputs,
       Iterable<? extends Bundle<?, ? extends CollectionT>> outputs,
       Instant earliestHold) {
-    pendingUpdates.offer(
-        PendingWatermarkUpdate.create(
-            executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold));
+    synchronized (pendingUpdates) {
+      pendingUpdates.offer(
+          PendingWatermarkUpdate.create(
+              executable, completed, timerUpdate, unprocessedInputs, outputs, earliestHold));
+    }
     tryApplyPendingUpdates();
   }
 
@@ -1140,10 +1128,12 @@ class WatermarkManager<ExecutableT, CollectionT> {
   /** Applies up to {@code numUpdates}, or all available updates if numUpdates is non-positive. */
   @GuardedBy("refreshLock")
   private void applyNUpdates(int numUpdates) {
-    for (int i = 0; !pendingUpdates.isEmpty() && (i < numUpdates || numUpdates <= 0); i++) {
-      PendingWatermarkUpdate<ExecutableT, CollectionT> pending = pendingUpdates.poll();
-      applyPendingUpdate(pending);
-      pendingRefreshes.add(pending.getExecutable());
+    synchronized (pendingUpdates) {
+      for (int i = 0; !pendingUpdates.isEmpty() && ((i < numUpdates) || (numUpdates <= 0)); i++) {
+        PendingWatermarkUpdate<ExecutableT, CollectionT> pending = pendingUpdates.poll();
+        applyPendingUpdate(pending);
+        pendingRefreshes.add(pending.getExecutable());
+      }
     }
   }
 
@@ -1269,26 +1259,27 @@ class WatermarkManager<ExecutableT, CollectionT> {
         if (ignoredExecutables.contains(transform)) {
           continue;
         }
-        if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) {
-          TransformWatermarks watermarks = watermarksEntry.getValue();
-          Collection<FiredTimers<ExecutableT>> firedTimers = watermarks.extractFiredTimers();
-          if (!firedTimers.isEmpty()) {
-            List<TimerData> newTimers =
-                firedTimers.stream()
-                    .flatMap(f -> f.getTimers().stream())
-                    .collect(Collectors.toList());
-            transformsWithAlreadyExtractedTimers.compute(
-                transform,
-                (k, v) -> {
-                  if (v == null) {
-                    v = new HashSet<>();
-                  }
-                  final Set<String> toUpdate = v;
-                  newTimers.forEach(
-                      td -> toUpdate.add(TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(td)));
-                  return v;
-                });
-            allTimers.addAll(firedTimers);
+        synchronized (transformsWithAlreadyExtractedTimers) {
+          if (!transformsWithAlreadyExtractedTimers.containsKey(transform)) {
+            TransformWatermarks watermarks = watermarksEntry.getValue();
+            Collection<FiredTimers<ExecutableT>> firedTimers = watermarks.extractFiredTimers();
+            if (!firedTimers.isEmpty()) {
+              List<TimerData> newTimers =
+                  firedTimers.stream()
+                      .flatMap(f -> f.getTimers().stream())
+                      .collect(Collectors.toList());
+              transformsWithAlreadyExtractedTimers.compute(
+                  transform,
+                  (k, v) -> {
+                    if (v == null) {
+                      v = new HashSet<>();
+                    }
+                    final Set<String> toUpdate = v;
+                    newTimers.forEach(td -> toUpdate.add(td.stringKey()));
+                    return v;
+                  });
+              allTimers.addAll(firedTimers);
+            }
           }
         }
       }
@@ -1563,24 +1554,18 @@ class WatermarkManager<ExecutableT, CollectionT> {
    *
    * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
    * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
-   * the input to the executed step. pushedBackTimers are timers that were in completedTimers at the
-   * input, but were pushed back due to processing constraints.
+   * the input to the executed step.
    */
   public static class TimerUpdate {
     private final StructuralKey<?> key;
     private final Iterable<? extends TimerData> completedTimers;
     private final Map<TimerKey, ? extends TimerData> setTimers;
     private final Map<TimerKey, ? extends TimerData> deletedTimers;
-    private final Iterable<? extends TimerData> pushedBackTimers;
 
     /** Returns a TimerUpdate for a null key with no timers. */
     public static TimerUpdate empty() {
       return new TimerUpdate(
-          null,
-          Collections.emptyList(),
-          Collections.emptyMap(),
-          Collections.emptyMap(),
-          Collections.emptyList());
+          null, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap());
     }
 
     /**
@@ -1651,26 +1636,19 @@ class WatermarkManager<ExecutableT, CollectionT> {
             key,
             ImmutableList.copyOf(completedTimers),
             ImmutableMap.copyOf(setTimers),
-            ImmutableMap.copyOf(deletedTimers),
-            Collections.emptyList());
+            ImmutableMap.copyOf(deletedTimers));
       }
     }
 
-    private static String getTimerIdAndTimerFamilyIdWithNamespace(TimerData td) {
-      return td.getNamespace() + td.getTimerId() + td.getTimerFamilyId();
-    }
-
     private TimerUpdate(
         StructuralKey<?> key,
         Iterable<? extends TimerData> completedTimers,
         Map<TimerKey, ? extends TimerData> setTimers,
-        Map<TimerKey, ? extends TimerData> deletedTimers,
-        Iterable<? extends TimerData> pushedBackTimers) {
+        Map<TimerKey, ? extends TimerData> deletedTimers) {
       this.key = key;
       this.completedTimers = completedTimers;
       this.setTimers = setTimers;
       this.deletedTimers = deletedTimers;
-      this.pushedBackTimers = pushedBackTimers;
     }
 
     @VisibleForTesting
@@ -1693,46 +1671,21 @@ class WatermarkManager<ExecutableT, CollectionT> {
       return deletedTimers.values();
     }
 
-    Iterable<? extends TimerData> getPushedBackTimers() {
-      return pushedBackTimers;
-    }
-
     boolean isEmpty() {
-      return Iterables.isEmpty(completedTimers)
-          && setTimers.isEmpty()
-          && deletedTimers.isEmpty()
-          && Iterables.isEmpty(pushedBackTimers);
+      return Iterables.isEmpty(completedTimers) && setTimers.isEmpty() && deletedTimers.isEmpty();
     }
 
     /**
      * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
-     * Note that if any of the completed timers is in pushedBackTimers, then it is set instead. The
-     * pushedBackTimers are cleared afterwards.
      */
     public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
       List<TimerData> timersToComplete = new ArrayList<>();
-      Set<TimerData> pushedBack = Sets.newHashSet(pushedBackTimers);
       Map<TimerKey, TimerData> newSetTimers = Maps.newLinkedHashMap();
       newSetTimers.putAll(setTimers);
       for (TimerData td : completedTimers) {
-        TimerKey timerKey = TimerKey.of(td);
-        if (!pushedBack.contains(td)) {
-          timersToComplete.add(td);
-        } else if (!newSetTimers.containsKey(timerKey)) {
-          newSetTimers.put(timerKey, td);
-        }
+        timersToComplete.add(td);
       }
-      return new TimerUpdate(
-          key, timersToComplete, newSetTimers, deletedTimers, Collections.emptyList());
-    }
-
-    /**
-     * Returns a {@link TimerUpdate} that is like this one, but with the pushedBackTimersare removed
-     * set by provided pushedBackTimers.
-     */
-    public TimerUpdate withPushedBackTimers(Iterable<TimerData> pushedBackTimers) {
-      return new TimerUpdate(
-          key, completedTimers, setTimers, deletedTimers, Lists.newArrayList(pushedBackTimers));
+      return new TimerUpdate(key, timersToComplete, newSetTimers, deletedTimers);
     }
 
     @Override
@@ -1759,7 +1712,6 @@ class WatermarkManager<ExecutableT, CollectionT> {
           .add("setTimers", setTimers)
           .add("completedTimers", completedTimers)
           .add("deletedTimers", deletedTimers)
-          .add("pushedBackTimers", pushedBackTimers)
           .toString();
     }
   }
diff --git a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
index c17dc54..3638e4e 100644
--- a/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
+++ b/runners/local-java/src/main/java/org/apache/beam/runners/local/ExecutionDriver.java
@@ -33,7 +33,7 @@ public interface ExecutionDriver {
       this.terminal = terminal;
     }
 
-    public boolean isTermainal() {
+    public boolean isTerminal() {
       return terminal;
     }
   }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fc80127..6f603a2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -4121,7 +4121,8 @@ public class ParDoTest implements Serializable {
       ValidatesRunner.class,
       UsesStatefulParDo.class,
       UsesTimersInParDo.class,
-      UsesLoopingTimer.class
+      UsesLoopingTimer.class,
+      UsesStrictTimerOrdering.class
     })
     public void testEventTimeTimerLoop() {
       final String stateId = "count";
@@ -4141,6 +4142,7 @@ public class ParDoTest implements Serializable {
             public void processElement(
                 @StateId(stateId) ValueState<Integer> countState,
                 @TimerId(timerId) Timer loopTimer) {
+              countState.write(0);
               loopTimer.offset(Duration.millis(1)).setRelative();
             }
 
@@ -4149,7 +4151,7 @@ public class ParDoTest implements Serializable {
                 @StateId(stateId) ValueState<Integer> countState,
                 @TimerId(timerId) Timer loopTimer,
                 OutputReceiver<Integer> r) {
-              int count = MoreObjects.firstNonNull(countState.read(), 0);
+              int count = Preconditions.checkNotNull(countState.read());
               if (count < loopCount) {
                 r.output(count);
                 countState.write(count + 1);
@@ -4159,7 +4161,9 @@ public class ParDoTest implements Serializable {
           };
 
       PCollection<Integer> output =
-          pipeline.apply(Create.of(KV.of("hello", 42))).apply(ParDo.of(fn));
+          pipeline
+              .apply(Create.of(KV.of("hello1", 42), KV.of("hello2", 42), KV.of("hello3", 42)))
+              .apply(ParDo.of(fn));
 
       PAssert.that(output).containsInAnyOrder(0, 1, 2, 3, 4);
       pipeline.run();