You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/08/27 16:07:47 UTC

[beam] branch master updated: [BEAM-10820] Delete dead code SDFFeederViaStateAndTimers.java

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new aacfec9  [BEAM-10820] Delete dead code SDFFeederViaStateAndTimers.java
     new 9befacb  Merge pull request #12691 from lukecwik/beam10820
aacfec9 is described below

commit aacfec90ab29acf2e2188e325e8e8881966de686
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Aug 26 13:58:48 2020 -0700

    [BEAM-10820] Delete dead code SDFFeederViaStateAndTimers.java
---
 .../splittabledofn/SDFFeederViaStateAndTimers.java | 193 ---------------------
 .../fnexecution/splittabledofn/package-info.java   |  20 ---
 2 files changed, 213 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
deleted file mode 100644
index d4edbd2..0000000
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.fnexecution.splittabledofn;
-
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.core.construction.PTransformTranslation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.state.ValueState;
-import org.apache.beam.sdk.state.WatermarkHoldState;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
-import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.util.Durations;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
-import org.joda.time.Instant;
-
-/**
- * Helper class for feeding element/restricton pairs into a {@link
- * PTransformTranslation#SPLITTABLE_PROCESS_ELEMENTS_URN} transform, implementing checkpointing
- * only, by using state and timers for storing the last element/restriction pair, similarly to
- * {@link org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn} but in a portable
- * fashion.
- */
-public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
-  private final Coder<BoundedWindow> windowCoder;
-  private final Coder<WindowedValue<KV<InputT, RestrictionT>>> elementRestrictionWireCoder;
-
-  private final StateInternals stateInternals;
-  private final TimerInternals timerInternals;
-
-  private StateNamespace stateNamespace;
-
-  private final StateTag<ValueState<WindowedValue<KV<InputT, RestrictionT>>>> seedTag;
-  private ValueState<WindowedValue<KV<InputT, RestrictionT>>> seedState;
-
-  private final StateTag<ValueState<RestrictionT>> restrictionTag;
-  private ValueState<RestrictionT> restrictionState;
-
-  private StateTag<WatermarkHoldState> watermarkHoldTag =
-      StateTags.makeSystemTagInternal(
-          StateTags.<GlobalWindow>watermarkStateInternal("hold", TimestampCombiner.LATEST));
-  private WatermarkHoldState holdState;
-
-  private Instant inputTimestamp;
-  private List<BundleApplication> primaryRoots;
-  private List<DelayedBundleApplication> residualRoots;
-
-  /** Initializes the feeder. */
-  public SDFFeederViaStateAndTimers(
-      StateInternals stateInternals,
-      TimerInternals timerInternals,
-      Coder<InputT> elementWireCoder,
-      Coder<RestrictionT> restrictionWireCoder,
-      Coder<BoundedWindow> windowCoder) {
-    this.stateInternals = stateInternals;
-    this.timerInternals = timerInternals;
-    this.windowCoder = windowCoder;
-    this.elementRestrictionWireCoder =
-        FullWindowedValueCoder.of(KvCoder.of(elementWireCoder, restrictionWireCoder), windowCoder);
-    this.seedTag = StateTags.value("seed", elementRestrictionWireCoder);
-    this.restrictionTag = StateTags.value("restriction", restrictionWireCoder);
-  }
-
-  /** Passes the initial element/restriction pair. */
-  public void seed(WindowedValue<KV<InputT, RestrictionT>> elementRestriction) {
-    initState(
-        StateNamespaces.window(
-            windowCoder, Iterables.getOnlyElement(elementRestriction.getWindows())));
-    seedState.write(elementRestriction);
-    inputTimestamp = elementRestriction.getTimestamp();
-  }
-
-  /**
-   * Resumes from a timer and returns the current element/restriction pair (with an up-to-date value
-   * of the restriction).
-   */
-  public WindowedValue<KV<InputT, RestrictionT>> resume(TimerData timer) {
-    initState(timer.getNamespace());
-    WindowedValue<KV<InputT, RestrictionT>> seed = seedState.read();
-    inputTimestamp = seed.getTimestamp();
-    return seed.withValue(KV.of(seed.getValue().getKey(), restrictionState.read()));
-  }
-
-  /**
-   * Commits the state and timers: clears both if no checkpoint happened, or adjusts the restriction
-   * and sets a wake-up timer if a checkpoint happened.
-   */
-  public void commit() throws IOException {
-    if (primaryRoots == null) {
-      // No split - the call terminated.
-      seedState.clear();
-      restrictionState.clear();
-      holdState.clear();
-      return;
-    }
-
-    // For now can only happen on the first instruction which is SPLITTABLE_PROCESS_ELEMENTS.
-    checkArgument(residualRoots.size() == 1, "More than 1 residual is unsupported for now");
-    DelayedBundleApplication residual = residualRoots.get(0);
-
-    ByteString encodedResidual = residual.getApplication().getElement();
-    WindowedValue<KV<InputT, RestrictionT>> decodedResidual =
-        elementRestrictionWireCoder.decode(encodedResidual.newInput());
-
-    restrictionState.write(decodedResidual.getValue().getValue());
-
-    Instant watermarkHold =
-        residual.getApplication().getOutputWatermarksMap().isEmpty()
-            ? inputTimestamp
-            : new Instant(
-                Iterables.getOnlyElement(
-                    residual.getApplication().getOutputWatermarksMap().values()));
-    checkArgument(
-        !watermarkHold.isBefore(inputTimestamp),
-        "Watermark hold %s can not be before input timestamp %s",
-        watermarkHold,
-        inputTimestamp);
-    holdState.add(watermarkHold);
-
-    Instant requestedWakeupTime =
-        new Instant(
-            System.currentTimeMillis() + Durations.toMillis(residual.getRequestedTimeDelay()));
-    Instant wakeupTime =
-        timerInternals.currentProcessingTime().isBefore(requestedWakeupTime)
-            ? requestedWakeupTime
-            : timerInternals.currentProcessingTime();
-
-    // Set a timer to continue processing this element.
-    timerInternals.setTimer(
-        stateNamespace,
-        "sdfContinuation",
-        "sdfContinuation",
-        wakeupTime,
-        wakeupTime,
-        TimeDomain.PROCESSING_TIME);
-  }
-
-  /** Signals that a split happened. */
-  public void split(
-      List<BundleApplication> primaryRoots, List<DelayedBundleApplication> residualRoots) {
-    checkState(
-        this.primaryRoots == null,
-        "At most 1 split supported, however got new split (%s, %s) "
-            + "in addition to existing (%s, %s)",
-        primaryRoots,
-        residualRoots,
-        this.primaryRoots,
-        this.residualRoots);
-    this.primaryRoots = primaryRoots;
-    this.residualRoots = residualRoots;
-  }
-
-  private void initState(StateNamespace ns) {
-    stateNamespace = ns;
-    seedState = stateInternals.state(ns, seedTag);
-    restrictionState = stateInternals.state(ns, restrictionTag);
-    holdState = stateInternals.state(ns, watermarkHoldTag);
-  }
-}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/package-info.java
deleted file mode 100644
index e50f14d..0000000
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/** Utilities for a Beam runner to interact with a remotely running splittable DoFn. */
-package org.apache.beam.runners.fnexecution.splittabledofn;