You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 22:50:03 UTC

[29/51] [abbrv] incubator-beam git commit: Remove deprecated InMemoryTimerInternals from SDK

Remove deprecated InMemoryTimerInternals from SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9f1d3d15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9f1d3d15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9f1d3d15

Branch: refs/heads/python-sdk
Commit: 9f1d3d155303bd3d1069541be704d5f3e74926eb
Parents: 6a05d7f
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 20 14:07:00 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Dec 20 15:16:32 2016 -0800

----------------------------------------------------------------------
 .../sdk/util/state/InMemoryTimerInternals.java  | 275 -------------------
 1 file changed, 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9f1d3d15/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
deleted file mode 100644
index a910d64..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.base.MoreObjects;
-import java.util.HashSet;
-import java.util.PriorityQueue;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowTracing;
-import org.joda.time.Instant;
-
-/**
- * @deprecated use {@code org.apache.beam.runners.core.InMemoryTimerInternals}.
- */
-@Deprecated
-public class InMemoryTimerInternals implements TimerInternals {
-
-  /** At most one timer per timestamp is kept. */
-  private Set<TimerData> existingTimers = new HashSet<>();
-
-  /** Pending input watermark timers, in timestamp order. */
-  private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11);
-
-  /** Pending processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11);
-
-  /** Pending synchronized processing time timers, in timestamp order. */
-  private PriorityQueue<TimerData> synchronizedProcessingTimers = new PriorityQueue<>(11);
-
-  /** Current input watermark. */
-  private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current output watermark. */
-  @Nullable private Instant outputWatermarkTime = null;
-
-  /** Current processing time. */
-  private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  /** Current synchronized processing time. */
-  private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
-
-  @Override
-  @Nullable
-  public Instant currentOutputWatermarkTime() {
-    return outputWatermarkTime;
-  }
-
-  /**
-   * Returns when the next timer in the given time domain will fire, or {@code null}
-   * if there are no timers scheduled in that time domain.
-   */
-  @Nullable
-  public Instant getNextTimer(TimeDomain domain) {
-    final TimerData data;
-    switch (domain) {
-      case EVENT_TIME:
-        data = watermarkTimers.peek();
-        break;
-      case PROCESSING_TIME:
-        data = processingTimers.peek();
-        break;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        data = synchronizedProcessingTimers.peek();
-        break;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + domain);
-    }
-    return (data == null) ? null : data.getTimestamp();
-  }
-
-  private PriorityQueue<TimerData> queue(TimeDomain domain) {
-    switch (domain) {
-      case EVENT_TIME:
-        return watermarkTimers;
-      case PROCESSING_TIME:
-        return processingTimers;
-      case SYNCHRONIZED_PROCESSING_TIME:
-        return synchronizedProcessingTimers;
-      default:
-        throw new IllegalArgumentException("Unexpected time domain: " + domain);
-    }
-  }
-
-  @Override
-  public void setTimer(StateNamespace namespace, String timerId, Instant target,
-      TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
-  }
-
-  @Override
-  public void setTimer(TimerData timerData) {
-    WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
-    if (existingTimers.add(timerData)) {
-      queue(timerData.getDomain()).add(timerData);
-    }
-  }
-
-  @Override
-  public void deleteTimer(StateNamespace namespace, String timerId) {
-    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
-  }
-
-  @Override
-  public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(timer);
-    queue(timer.getDomain()).remove(timer);
-  }
-
-  @Override
-  public Instant currentProcessingTime() {
-    return processingTime;
-  }
-
-  @Override
-  @Nullable
-  public Instant currentSynchronizedProcessingTime() {
-    return synchronizedProcessingTime;
-  }
-
-  @Override
-  public Instant currentInputWatermarkTime() {
-    return inputWatermarkTime;
-  }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(getClass())
-        .add("watermarkTimers", watermarkTimers)
-        .add("processingTimers", processingTimers)
-        .add("synchronizedProcessingTimers", synchronizedProcessingTimers)
-        .add("inputWatermarkTime", inputWatermarkTime)
-        .add("outputWatermarkTime", outputWatermarkTime)
-        .add("processingTime", processingTime)
-        .toString();
-  }
-
-  /** Advances input watermark to the given value. */
-  public void advanceInputWatermark(Instant newInputWatermark) throws Exception {
-    checkNotNull(newInputWatermark);
-    checkState(
-        !newInputWatermark.isBefore(inputWatermarkTime),
-        "Cannot move input watermark time backwards from %s to %s",
-        inputWatermarkTime,
-        newInputWatermark);
-    WindowTracing.trace(
-        "{}.advanceInputWatermark: from {} to {}",
-        getClass().getSimpleName(), inputWatermarkTime, newInputWatermark);
-    inputWatermarkTime = newInputWatermark;
-  }
-
-  /** Advances output watermark to the given value. */
-  public void advanceOutputWatermark(Instant newOutputWatermark) {
-    checkNotNull(newOutputWatermark);
-    final Instant adjustedOutputWatermark;
-    if (newOutputWatermark.isAfter(inputWatermarkTime)) {
-      WindowTracing.trace(
-          "{}.advanceOutputWatermark: clipping output watermark from {} to {}",
-          getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime);
-      adjustedOutputWatermark = inputWatermarkTime;
-    } else {
-      adjustedOutputWatermark = newOutputWatermark;
-    }
-
-    checkState(
-        outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime),
-        "Cannot move output watermark time backwards from %s to %s",
-        outputWatermarkTime,
-        adjustedOutputWatermark);
-    WindowTracing.trace(
-        "{}.advanceOutputWatermark: from {} to {}",
-        getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark);
-    outputWatermarkTime = adjustedOutputWatermark;
-  }
-
-  /** Advances processing time to the given value. */
-  public void advanceProcessingTime(Instant newProcessingTime) throws Exception {
-    checkNotNull(newProcessingTime);
-    checkState(
-        !newProcessingTime.isBefore(processingTime),
-        "Cannot move processing time backwards from %s to %s",
-        processingTime,
-        newProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), processingTime, newProcessingTime);
-    processingTime = newProcessingTime;
-  }
-
-  /** Advances synchronized processing time to the given value. */
-  public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime)
-      throws Exception {
-    checkNotNull(newSynchronizedProcessingTime);
-    checkState(
-        !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime),
-        "Cannot move processing time backwards from %s to %s",
-        synchronizedProcessingTime,
-        newSynchronizedProcessingTime);
-    WindowTracing.trace(
-        "{}.advanceProcessingTime: from {} to {}",
-        getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime);
-    synchronizedProcessingTime = newSynchronizedProcessingTime;
-  }
-
-  /** Returns the next eligible event time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextEventTimer() {
-    TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextEventTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, inputWatermarkTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible processing time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextProcessingTimer() {
-    TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, processingTime);
-    }
-    return timer;
-  }
-
-  /** Returns the next eligible synchronized processing time timer, if none returns null. */
-  @Nullable
-  public TimerData removeNextSynchronizedProcessingTimer() {
-    TimerData timer = removeNextTimer(
-        synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    if (timer != null) {
-      WindowTracing.trace(
-          "{}.removeNextSynchronizedProcessingTimer: firing {} at {}",
-          getClass().getSimpleName(), timer, synchronizedProcessingTime);
-    }
-    return timer;
-  }
-
-  @Nullable
-  private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) {
-    PriorityQueue<TimerData> queue = queue(domain);
-    if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) {
-      TimerData timer = queue.remove();
-      existingTimers.remove(timer);
-      return timer;
-    } else {
-      return null;
-    }
-  }
-}