You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/15 22:29:07 UTC
[07/10] incubator-beam git commit: Removes
ArgumentProvider.windowingInternals
Removes ArgumentProvider.windowingInternals
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f3e8a038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f3e8a038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f3e8a038
Branch: refs/heads/master
Commit: f3e8a0383bf9cb3f9452e0364f7deba113cadff9
Parents: a22de15
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Dec 9 17:23:15 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Dec 15 13:58:43 2016 -0800
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 57 --------------------
.../beam/runners/core/SplittableParDo.java | 7 ---
.../beam/sdk/transforms/DoFnAdapters.java | 14 -----
.../apache/beam/sdk/transforms/DoFnTester.java | 7 ---
.../sdk/transforms/reflect/DoFnInvoker.java | 20 -------
.../transforms/reflect/DoFnInvokersTest.java | 6 ---
6 files changed, 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index af7f5ca..041cdde 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -52,13 +52,10 @@ import org.apache.beam.sdk.util.ExecutionContext.StepContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
import org.apache.beam.sdk.util.state.StateSpec;
@@ -420,11 +417,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- throw new UnsupportedOperationException("WindowingInternals are unsupported.");
- }
-
- @Override
public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
throw new UnsupportedOperationException(
"Cannot access RestrictionTracker outside of @ProcessElement method.");
@@ -634,54 +626,5 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
throw new UnsupportedOperationException("Timer parameters are not supported.");
}
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return new WindowingInternals<InputT, OutputT>() {
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public TimerInternals timerInternals() {
- return context.stepContext.timerInternals();
- }
-
- @Override
- public StateInternals<?> stateInternals() {
- return stepContext.stateInternals();
- }
-
- @Override
- public void outputWindowedValue(
- OutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- throw new UnsupportedOperationException("A DoFn cannot output to a different window");
- }
-
- @Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- throw new UnsupportedOperationException(
- "A DoFn cannot side output to a different window");
- }
-
- @Override
- public <T> T sideInput(PCollectionView<T> view, BoundedWindow sideInputWindow) {
- return context.sideInput(view, sideInputWindow);
- }
- };
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 8a9bfcd..720db63 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.util.state.StateInternals;
@@ -685,12 +684,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- // DoFnSignatures should have verified that this DoFn doesn't access extra context.
- throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
- }
-
- @Override
public TrackerT restrictionTracker() {
return tracker;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index d1c40a6..0a71faa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.State;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -202,14 +201,6 @@ public class DoFnAdapters {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- // The OldDoFn doesn't allow us to ask for these outside ProcessElements, so this
- // should be unreachable.
- throw new UnsupportedOperationException(
- "Can only get WindowingInternals in processElement");
- }
-
- @Override
public DoFn.InputProvider<InputT> inputProvider() {
throw new UnsupportedOperationException("inputProvider() exists only for testing");
}
@@ -322,11 +313,6 @@ public class DoFnAdapters {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return context.windowingInternals();
- }
-
- @Override
public DoFn.InputProvider<InputT> inputProvider() {
throw new UnsupportedOperationException("inputProvider() exists only for testing");
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 93b3f59..527d529 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
@@ -335,12 +334,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- throw new UnsupportedOperationException(
- "Not expected to access WindowingInternals from a new DoFn");
- }
-
- @Override
public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
throw new UnsupportedOperationException(
"Not expected to access RestrictionTracker from a regular DoFn in DoFnTester");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 97ac9d3..354578e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -27,11 +27,9 @@ import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.DoFn.StartBundle;
import org.apache.beam.sdk.transforms.DoFn.StateId;
import org.apache.beam.sdk.transforms.DoFn.TimerId;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.State;
/**
@@ -122,19 +120,6 @@ public interface DoFnInvoker<InputT, OutputT> {
OutputReceiver<OutputT> outputReceiver();
/**
- * For migration from {@link OldDoFn} to {@link DoFn}, provide a {@link WindowingInternals} so
- * an {@link OldDoFn} can be run via {@link DoFnInvoker}.
- *
- * <p>This is <i>not</i> exposed via the reflective capabilities of {@link DoFn}.
- *
- * @deprecated Please port occurences of {@link OldDoFn} to {@link DoFn}. If they require state
- * and timers, they will need to wait for the arrival of those features. Do not introduce
- * new uses of this method.
- */
- @Deprecated
- WindowingInternals<InputT, OutputT> windowingInternals();
-
- /**
* If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
* the current {@link ProcessElement} call.
*/
@@ -180,11 +165,6 @@ public interface DoFnInvoker<InputT, OutputT> {
}
@Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
- return null;
- }
-
- @Override
public State state(String stateId) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f3e8a038/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 55b8e7e..4c6bee1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
-import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
@@ -51,7 +50,6 @@ import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
import org.apache.beam.sdk.util.TimerSpecs;
import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.StateSpec;
import org.apache.beam.sdk.util.state.StateSpecs;
import org.apache.beam.sdk.util.state.ValueState;
@@ -77,18 +75,14 @@ public class DoFnInvokersTest {
@Mock private IntervalWindow mockWindow;
@Mock private DoFn.InputProvider<String> mockInputProvider;
@Mock private DoFn.OutputReceiver<String> mockOutputReceiver;
- @Mock private WindowingInternals<String, String> mockWindowingInternals;
@Mock private DoFnInvoker.ArgumentProvider<String, String> mockArgumentProvider;
- @Mock private OldDoFn<String, String> mockOldDoFn;
-
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
when(mockArgumentProvider.window()).thenReturn(mockWindow);
when(mockArgumentProvider.inputProvider()).thenReturn(mockInputProvider);
when(mockArgumentProvider.outputReceiver()).thenReturn(mockOutputReceiver);
- when(mockArgumentProvider.windowingInternals()).thenReturn(mockWindowingInternals);
when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
}