You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:55 UTC

[41/50] incubator-beam git commit: Make TimerData#compareTo consistent with equals

Make TimerData#compareTo consistent with equals

Timers are equal if the domain, timestamp, and namespace are equal.
Compare these values in compareTo. The ordering of TimerData that are
not in the same namespace or domain is arbitrary.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9fe6ce22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9fe6ce22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9fe6ce22

Branch: refs/heads/gearpump-runner
Commit: 9fe6ce22ca12b48704deb0e7cf3c583dff9b1870
Parents: fc87a0c
Author: Thomas Groh <tg...@google.com>
Authored: Wed Aug 10 13:52:14 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Aug 10 13:55:54 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/TimerInternals.java    | 15 ++++++-
 .../beam/sdk/util/TimerInternalsTest.java       | 47 ++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9fe6ce22/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 3212d64..eb49b9d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -30,6 +30,7 @@ import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ComparisonChain;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -205,9 +206,21 @@ public interface TimerInternals {
           .toString();
     }
 
+    /**
+     * {@inheritDoc}.
+     *
+     * <p>The ordering of {@link TimerData} that are not in the same namespace or domain is
+     * arbitrary.
+     */
     @Override
     public int compareTo(TimerData o) {
-      return Long.compare(timestamp.getMillis(), o.getTimestamp().getMillis());
+      ComparisonChain chain =
+          ComparisonChain.start().compare(timestamp, o.getTimestamp()).compare(domain, o.domain);
+      if (chain.result() == 0) {
+        // Obtaining the stringKey may be expensive; only do so if required
+        chain = chain.compare(namespace.stringKey(), o.namespace.stringKey());
+      }
+      return chain.result();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9fe6ce22/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
index baf911a..bc2930c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
@@ -17,12 +17,18 @@
  */
 package org.apache.beam.sdk.util;
 
+import static org.hamcrest.Matchers.comparesEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.testing.CoderProperties;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.TimerInternals.TimerDataCoder;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 
 import org.joda.time.Instant;
@@ -50,4 +56,45 @@ public class TimerInternalsTest {
                 windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
             new Instant(99), TimeDomain.PROCESSING_TIME));
   }
+
+  @Test
+  public void testCompareTo() {
+    Instant firstTimestamp = new Instant(100);
+    Instant secondTimestamp = new Instant(200);
+    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), firstTimestamp);
+    IntervalWindow secondWindow =  new IntervalWindow(firstTimestamp, secondTimestamp);
+    Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
+
+    StateNamespace firstWindowNs = StateNamespaces.window(windowCoder, firstWindow);
+    StateNamespace secondWindowNs = StateNamespaces.window(windowCoder, secondWindow);
+
+    TimerData firstEventTime = TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME);
+    TimerData secondEventTime = TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.EVENT_TIME);
+    TimerData thirdEventTime = TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.EVENT_TIME);
+
+    TimerData firstProcTime =
+        TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.PROCESSING_TIME);
+    TimerData secondProcTime =
+        TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME);
+    TimerData thirdProcTime =
+        TimerData.of(secondWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME);
+
+    assertThat(firstEventTime,
+        comparesEqualTo(TimerData.of(firstWindowNs, firstTimestamp, TimeDomain.EVENT_TIME)));
+    assertThat(firstEventTime, lessThan(secondEventTime));
+    assertThat(secondEventTime, lessThan(thirdEventTime));
+    assertThat(firstEventTime, lessThan(thirdEventTime));
+
+    assertThat(secondProcTime,
+        comparesEqualTo(TimerData.of(firstWindowNs, secondTimestamp, TimeDomain.PROCESSING_TIME)));
+    assertThat(firstProcTime, lessThan(secondProcTime));
+    assertThat(secondProcTime, lessThan(thirdProcTime));
+    assertThat(firstProcTime, lessThan(thirdProcTime));
+
+    assertThat(firstEventTime, not(comparesEqualTo(firstProcTime)));
+    assertThat(firstProcTime,
+        not(comparesEqualTo(TimerData.of(firstWindowNs,
+            firstTimestamp,
+            TimeDomain.SYNCHRONIZED_PROCESSING_TIME))));
+  }
 }