You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/13 22:45:37 UTC

[1/2] beam git commit: Remove Triggers.java from SDK entirely

Repository: beam
Updated Branches:
  refs/heads/master 75d7b273c -> 8761d8642


Remove Triggers.java from SDK entirely

Update DataflowRunner with support for this removal


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6832668a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6832668a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6832668a

Branch: refs/heads/master
Commit: 6832668ac6888d3e8d56819a7a73ff05798c7082
Parents: 75d7b27
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 13 14:52:25 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 13 15:45:21 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../beam/sdk/transforms/windowing/Triggers.java | 323 -------------------
 2 files changed, 1 insertion(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6832668a/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 96eced8..c33faf7 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -33,7 +33,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <dataflow.container_version>beam-master-20170411</dataflow.container_version>
+    <dataflow.container_version>beam-master-20170413</dataflow.container_version>
     <dataflow.fnapi_environment_major_version>1</dataflow.fnapi_environment_major_version>
     <dataflow.legacy_environment_major_version>6</dataflow.legacy_environment_major_version>
   </properties>

http://git-wip-us.apache.org/repos/asf/beam/blob/6832668a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
deleted file mode 100644
index 47f05e4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import java.io.Serializable;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
-import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-import org.apache.beam.sdk.util.ReshuffleTrigger;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-/** Utilities for working with {@link Triggers Triggers}. */
-@Experimental(Experimental.Kind.TRIGGER)
-@Deprecated
-public class Triggers implements Serializable {
-
-  @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
-
-  public static RunnerApi.Trigger toProto(Trigger trigger) {
-    return CONVERTER.convertTrigger(trigger);
-  }
-
-  @VisibleForTesting
-  static class ProtoConverter {
-
-    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
-      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
-      return tryConvert(evaluationMethod, trigger);
-    }
-
-    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
-      try {
-        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
-      } catch (InvocationTargetException exc) {
-        if (exc.getCause() instanceof RuntimeException) {
-          throw (RuntimeException) exc.getCause();
-        } else {
-          throw new RuntimeException(exc.getCause());
-        }
-      } catch (IllegalAccessException exc) {
-        throw new IllegalStateException(
-            String.format("Internal error: could not invoke %s", evaluationMethod));
-      }
-    }
-
-    private Method getEvaluationMethod(Class<?> clazz) {
-      try {
-        return getClass().getDeclaredMethod("convertSpecific", clazz);
-      } catch (NoSuchMethodException exc) {
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot translate trigger class %s to a runner-API proto.",
-                clazz.getCanonicalName()),
-            exc);
-      }
-    }
-
-    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterWatermark.FromEndOfWindow v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setAfterSynchronizedProcessingTime(
-              RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
-          .build();
-    }
-
-    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
-      switch (timeDomain) {
-        case EVENT_TIME:
-          return RunnerApi.TimeDomain.EVENT_TIME;
-        case PROCESSING_TIME:
-          return RunnerApi.TimeDomain.PROCESSING_TIME;
-        case SYNCHRONIZED_PROCESSING_TIME:
-          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
-        default:
-          throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
-      }
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
-      RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterAll v) {
-      RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterPane v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setElementCount(
-              RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
-      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
-          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
-
-      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
-      if (v.getLateTrigger() != null) {
-        builder.setLateFirings(toProto(v.getLateTrigger()));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterEach v) {
-      RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();
-
-      for (Trigger subtrigger : v.subTriggers()) {
-        builder.addSubtriggers(toProto(subtrigger));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setRepeat(
-              RunnerApi.Trigger.Repeat.newBuilder()
-                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
-      return RunnerApi.Trigger.newBuilder()
-          .setOrFinally(
-              RunnerApi.Trigger.OrFinally.newBuilder()
-                  .setMain(toProto(v.getMainTrigger()))
-                  .setFinally(toProto(v.getUntilTrigger())))
-          .build();
-    }
-
-    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
-      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
-          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
-
-      for (TimestampTransform transform : v.getTimestampTransforms()) {
-        builder.addTimestampTransforms(convertTimestampTransform(transform));
-      }
-
-      return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
-    }
-
-    private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
-      if (transform instanceof TimestampTransform.Delay) {
-        return RunnerApi.TimestampTransform.newBuilder()
-            .setDelay(
-                RunnerApi.TimestampTransform.Delay.newBuilder()
-                    .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
-            .build();
-      } else if (transform instanceof TimestampTransform.AlignTo) {
-        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
-        return RunnerApi.TimestampTransform.newBuilder()
-            .setAlignTo(
-                RunnerApi.TimestampTransform.AlignTo.newBuilder()
-                    .setPeriod(alignTo.getPeriod().getMillis())
-                    .setOffset(alignTo.getOffset().getMillis()))
-            .build();
-
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
-      }
-    }
-  }
-
-  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
-    switch (triggerProto.getTriggerCase()) {
-      case AFTER_ALL:
-        return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
-      case AFTER_ANY:
-        return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
-      case AFTER_EACH:
-        return AfterEach.inOrder(
-            protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
-      case AFTER_END_OF_WINDOW:
-        RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();
-
-        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
-          return AfterWatermark.pastEndOfWindow();
-        }
-
-        // It either has early or late firings or both; our typing in Java makes this a smidge
-        // annoying
-        if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
-          AfterWatermarkEarlyAndLate trigger =
-              AfterWatermark.pastEndOfWindow()
-                  .withEarlyFirings(
-                      (OnceTrigger)
-                          fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));
-
-          if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
-            trigger =
-                trigger.withLateFirings(
-                    (OnceTrigger)
-                        fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
-          }
-          return trigger;
-        } else {
-          // only late firings, so return directly
-          return AfterWatermark.pastEndOfWindow()
-              .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
-        }
-      case AFTER_PROCESSING_TIME:
-        AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
-        for (RunnerApi.TimestampTransform transform :
-            triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
-          switch (transform.getTimestampTransformCase()) {
-            case ALIGN_TO:
-              trigger =
-                  trigger.alignedTo(
-                      Duration.millis(transform.getAlignTo().getPeriod()),
-                      new Instant(transform.getAlignTo().getOffset()));
-              break;
-            case DELAY:
-              trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
-              break;
-            case TIMESTAMPTRANSFORM_NOT_SET:
-              throw new IllegalArgumentException(
-                  String.format(
-                      "Required field 'timestamp_transform' not set in %s", transform));
-            default:
-              throw new IllegalArgumentException(
-                  String.format(
-                      "Unknown timestamp transform case: %s",
-                      transform.getTimestampTransformCase()));
-          }
-        }
-        return trigger;
-      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
-        return AfterSynchronizedProcessingTime.ofFirstElement();
-      case ALWAYS:
-        return new ReshuffleTrigger();
-      case ELEMENT_COUNT:
-        return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
-      case NEVER:
-        return Never.ever();
-      case OR_FINALLY:
-        return fromProto(triggerProto.getOrFinally().getMain())
-            .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
-      case REPEAT:
-        return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
-      case DEFAULT:
-        return DefaultTrigger.of();
-      case TRIGGER_NOT_SET:
-        throw new IllegalArgumentException(
-            String.format("Required field 'trigger' not set in %s", triggerProto));
-      default:
-        throw new IllegalArgumentException(
-            String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
-    }
-  }
-
-  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
-    List<Trigger> result = Lists.newArrayList();
-    for (RunnerApi.Trigger trigger : triggers) {
-      result.add(fromProto(trigger));
-    }
-    return result;
-  }
-
-  // Do not instantiate
-  private Triggers() {}
-}


[2/2] beam git commit: This closes #2529

Posted by dh...@apache.org.
This closes #2529


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8761d864
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8761d864
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8761d864

Branch: refs/heads/master
Commit: 8761d8642e20a7504504d479b8ffd70c55f1b43d
Parents: 75d7b27 6832668
Author: Dan Halperin <dh...@google.com>
Authored: Thu Apr 13 15:45:27 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Apr 13 15:45:27 2017 -0700

----------------------------------------------------------------------
 runners/google-cloud-dataflow-java/pom.xml      |   2 +-
 .../beam/sdk/transforms/windowing/Triggers.java | 323 -------------------
 2 files changed, 1 insertion(+), 324 deletions(-)
----------------------------------------------------------------------