You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/01/17 17:53:08 UTC

[2/7] beam git commit: Removes ArgumentProvider.windowingInternals

Removes ArgumentProvider.windowingInternals


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/50979f72
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/50979f72
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/50979f72

Branch: refs/heads/master
Commit: 50979f7262203987ef3ec4a3fbfeeb3f4ae769e7
Parents: ad5eb06
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:23:15 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jan 12 12:56:27 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     | 63 --------------------
 .../beam/runners/core/SplittableParDo.java      |  7 ---
 .../beam/sdk/transforms/DoFnAdapters.java       | 14 -----
 .../apache/beam/sdk/transforms/DoFnTester.java  |  7 ---
 .../sdk/transforms/reflect/DoFnInvoker.java     | 20 -------
 .../transforms/reflect/DoFnInvokersTest.java    |  6 --
 6 files changed, 117 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index b42c57d..df5f3f6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -56,10 +56,8 @@ import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateSpec;
@@ -451,11 +449,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
-    }
-
-    @Override
     public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
       throw new UnsupportedOperationException(
           "Cannot access RestrictionTracker outside of @ProcessElement method.");
@@ -670,57 +663,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         throw new RuntimeException(e);
       }
     }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return new WindowingInternals<InputT, OutputT>() {
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return windowedValue.getWindows();
-        }
-
-        @Override
-        public PaneInfo pane() {
-          return windowedValue.getPane();
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return context.stepContext.timerInternals();
-        }
-
-        @Override
-        public StateInternals<?> stateInternals() {
-          return stepContext.stateInternals();
-        }
-
-        @Override
-        public void outputWindowedValue(
-            OutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException("A DoFn cannot output to a different window");
-        }
-
-        @Override
-        public <SideOutputT> void sideOutputWindowedValue(
-            TupleTag<SideOutputT> tag,
-            SideOutputT output,
-            Instant timestamp,
-            Collection<? extends BoundedWindow> windows,
-            PaneInfo pane) {
-          throw new UnsupportedOperationException(
-              "A DoFn cannot side output to a different window");
-        }
-
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
-          return context.sideInput(view, sideInputWindow);
-        }
-      };
-    }
-
   }
 
   /**
@@ -871,11 +813,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
         CombineFn<AggInputT, ?, AggOutputT> combiner) {
       throw new UnsupportedOperationException("Cannot createAggregator in @OnTimer method");
     }
-
-    @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      throw new UnsupportedOperationException("WindowingInternals are unsupported.");
-    }
   }
 
   private static class TimerInternalsTimer implements Timer {

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index e6a2466..f8d12ec 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateInternals;
@@ -685,12 +684,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       }
 
       @Override
-      public WindowingInternals<InputT, OutputT> windowingInternals() {
-        // DoFnSignatures should have verified that this DoFn doesn't access extra context.
-        throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
-      }
-
-      @Override
       public TrackerT restrictionTracker() {
         return tracker;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index d1c40a6..0a71faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -202,14 +201,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
-      // should be unreachable.
-      throw new UnsupportedOperationException(
-          "Can only get WindowingInternals in processElement");
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }
@@ -322,11 +313,6 @@ public class DoFnAdapters {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return context.windowingInternals();
-    }
-
-    @Override
     public DoFn.InputProvider<InputT> inputProvider() {
       throw new UnsupportedOperationException("inputProvider() exists only for testing");
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 2d8684a..b2c3fd5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.InMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -328,12 +327,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
             }
 
             @Override
-            public WindowingInternals<InputT, OutputT> windowingInternals() {
-              throw new UnsupportedOperationException(
-                  "Not expected to access WindowingInternals from a new DoFn");
-            }
-
-            @Override
             public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
               throw new UnsupportedOperationException(
                   "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 97ac9d3..354578e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
 import org.apache.beam.sdk.transforms.DoFn.StartBundle;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.State;
 
 /**
@@ -122,19 +120,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     OutputReceiver<OutputT> outputReceiver();
 
     /**
-     * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so
-     * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
-     *
-     * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}.
-     *
-     * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state
-     *     and timers, they will need to wait for the arrival of those features. Do not introduce
-     *     new uses of this method.
-     */
-    @Deprecated
-    WindowingInternals<InputT, OutputT> windowingInternals();
-
-    /**
      * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
      * the current {@link ProcessElement} call.
      */
@@ -180,11 +165,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
-    public WindowingInternals<InputT, OutputT> windowingInternals() {
-      return null;
-    }
-
-    @Override
     public State state(String stateId) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/50979f72/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 55b8e7e..4c6bee1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -51,7 +50,6 @@ import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
 import org.apache.beam.sdk.util.TimerSpecs;
 import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
 import org.apache.beam.sdk.util.state.ValueState;
@@ -77,18 +75,14 @@ public class DoFnInvokersTest {
   @Mock private IntervalWindow mockWindow;
   @Mock private DoFn.InputProvider<String> mockInputProvider;
   @Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
-  @Mock private WindowingInternals<String, String> mockWindowingInternals;
   @Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
 
-  @Mock private OldDoFn<String, String> mockOldDoFn;
-
   @Before
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     when(mockArgumentProvider.window()).thenReturn(mockWindow);
     when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
     when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
-    when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals);
     when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
   }