You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:49:53 UTC
[19/51] [abbrv] incubator-beam git commit: Restore SDK's
InMemoryTimerInternals, deprecated
Restore SDK's InMemoryTimerInternals, deprecated
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69d2c47b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69d2c47b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69d2c47b
Branch: refs/heads/python-sdk
Commit: 69d2c47b6a476099535e9cefe62d4cce5ccafbc1
Parents: 445c120
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Dec 16 20:22:59 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 11:21:52 2016 -0800
----------------------------------------------------------------------
.../sdk/util/state/InMemoryTimerInternals.java | 275 +++++++++++++++++++
1 file changed, 275 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/69d2c47b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
new file mode 100644
index 0000000..a910d64
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util.state;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.base.MoreObjects;
+import java.util.HashSet;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.joda.time.Instant;
+
+/**
+ * @deprecated use {@code org.apache.beam.runners.core.InMemoryTimerInternals}.
+ */
+@Deprecated
+public class InMemoryTimerInternals implements TimerInternals {
+
+ /** At most one timer per timestamp is kept. */
+ private Set<TimerData> existingTimers = new HashSet<>();
+
+ /** Pending input watermark timers, in timestamp order. */
+ private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
+
+ /** Pending processing time timers, in timestamp order. */
+ private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
+
+ /** Pending synchronized processing time timers, in timestamp order. */
+ private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
+
+ /** Current input watermark. */
+ private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ /** Current output watermark. */
+ @Nullable private Instant outputWatermarkTime = null;
+
+ /** Current processing time. */
+ private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ /** Current synchronized processing time. */
+ private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return outputWatermarkTime;
+ }
+
+ /**
+ * Returns when the next timer in the given time domain will fire, or {@code null}
+ * if there are no timers scheduled in that time domain.
+ */
+ @Nullable
+ public Instant getNextTimer(TimeDomain domain) {
+ final TimerData data;
+ switch (domain) {
+ case EVENT_TIME:
+ data = watermarkTimers.peek();
+ break;
+ case PROCESSING_TIME:
+ data = processingTimers.peek();
+ break;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ data = synchronizedProcessingTimers.peek();
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ }
+ return (data == null) ? null : data.getTimestamp();
+ }
+
+ private PriorityQueue<TimerData> queue(TimeDomain domain) {
+ switch (domain) {
+ case EVENT_TIME:
+ return watermarkTimers;
+ case PROCESSING_TIME:
+ return processingTimers;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return synchronizedProcessingTimers;
+ default:
+ throw new IllegalArgumentException("Unexpected time domain: " + domain);
+ }
+ }
+
+ @Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ }
+
+ @Override
+ public void setTimer(TimerData timerData) {
+ WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
+ if (existingTimers.add(timerData)) {
+ queue(timerData.getDomain()).add(timerData);
+ }
+ }
+
+ @Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ }
+
+ @Override
+ public void deleteTimer(TimerData timer) {
+ WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
+ existingTimers.remove(timer);
+ queue(timer.getDomain()).remove(timer);
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return processingTime;
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return synchronizedProcessingTime;
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return inputWatermarkTime;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("watermarkTimers", watermarkTimers)
+ .add("processingTimers", processingTimers)
+ .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
+ .add("inputWatermarkTime", inputWatermarkTime)
+ .add("outputWatermarkTime", outputWatermarkTime)
+ .add("processingTime", processingTime)
+ .toString();
+ }
+
+ /** Advances input watermark to the given value. */
+ public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
+ checkNotNull(newInputWatermark);
+ checkState(
+ !newInputWatermark.isBefore(inputWatermarkTime),
+ "Cannot move input watermark time backwards from %s to %s",
+ inputWatermarkTime,
+ newInputWatermark);
+ WindowTracing.trace(
+ "{}.advanceInputWatermark: from {} to {}",
+ getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
+ inputWatermarkTime = newInputWatermark;
+ }
+
+ /** Advances output watermark to the given value. */
+ public void advanceOutputWatermark(Instant newOutputWatermark) {
+ checkNotNull(newOutputWatermark);
+ final Instant adjustedOutputWatermark;
+ if (newOutputWatermark.isAfter(inputWatermarkTime)) {
+ WindowTracing.trace(
+ "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
+ getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
+ adjustedOutputWatermark = inputWatermarkTime;
+ } else {
+ adjustedOutputWatermark = newOutputWatermark;
+ }
+
+ checkState(
+ outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
+ "Cannot move output watermark time backwards from %s to %s",
+ outputWatermarkTime,
+ adjustedOutputWatermark);
+ WindowTracing.trace(
+ "{}.advanceOutputWatermark: from {} to {}",
+ getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
+ outputWatermarkTime = adjustedOutputWatermark;
+ }
+
+ /** Advances processing time to the given value. */
+ public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
+ checkNotNull(newProcessingTime);
+ checkState(
+ !newProcessingTime.isBefore(processingTime),
+ "Cannot move processing time backwards from %s to %s",
+ processingTime,
+ newProcessingTime);
+ WindowTracing.trace(
+ "{}.advanceProcessingTime: from {} to {}",
+ getClass().getSimpleName(), processingTime, newProcessingTime);
+ processingTime = newProcessingTime;
+ }
+
+ /** Advances synchronized processing time to the given value. */
+ public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
+ throws Exception {
+ checkNotNull(newSynchronizedProcessingTime);
+ checkState(
+ !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
+ "Cannot move processing time backwards from %s to %s",
+ synchronizedProcessingTime,
+ newSynchronizedProcessingTime);
+ WindowTracing.trace(
+ "{}.advanceProcessingTime: from {} to {}",
+ getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
+ synchronizedProcessingTime = newSynchronizedProcessingTime;
+ }
+
+ /** Returns the next eligible event time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextEventTimer() {
+ TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextEventTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, inputWatermarkTime);
+ }
+ return timer;
+ }
+
+ /** Returns the next eligible processing time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextProcessingTimer() {
+ TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextProcessingTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, processingTime);
+ }
+ return timer;
+ }
+
+ /** Returns the next eligible synchronized processing time timer, if none returns null. */
+ @Nullable
+ public TimerData removeNextSynchronizedProcessingTimer() {
+ TimerData timer = removeNextTimer(
+ synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+ if (timer != null) {
+ WindowTracing.trace(
+ "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
+ getClass().getSimpleName(), timer, synchronizedProcessingTime);
+ }
+ return timer;
+ }
+
+ @Nullable
+ private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
+ PriorityQueue<TimerData> queue = queue(domain);
+ if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
+ TimerData timer = queue.remove();
+ existingTimers.remove(timer);
+ return timer;
+ } else {
+ return null;
+ }
+ }
+}