You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/18 17:52:23 UTC
[beam] branch master updated: [BEAM-2863] Fix translation of side
inputs due to missing method.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 6f12558 [BEAM-2863] Fix translation of side inputs due to missing method.
6f12558 is described below
commit 6f12558a7403e1d834719fc8562be386353c6f7b
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Jan 18 09:17:56 2018 -0800
[BEAM-2863] Fix translation of side inputs due to missing method.
---
.../core/construction/ForwardingPTransform.java | 8 +++++
.../construction/ForwardingPTransformTest.java | 38 +++++++++++++++-------
2 files changed, 35 insertions(+), 11 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index ccf41f3..532c1f1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.core.construction;
+import java.util.Map;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
@@ -24,6 +25,8 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
/**
* A base class for implementing {@link PTransform} overrides, which behave identically to the
@@ -49,6 +52,11 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
}
@Override
+ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+ return delegate().getAdditionalInputs();
+ }
+
+ @Override
public void validate(PipelineOptions options) {
delegate().validate(options);
}
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 4741b6b..979a978 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -20,12 +20,21 @@ package org.apache.beam.runners.core.construction;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.junit.Before;
import org.junit.Rule;
@@ -34,7 +43,6 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/**
@@ -63,10 +71,10 @@ public class ForwardingPTransformTest {
@Test
public void applyDelegates() {
@SuppressWarnings("unchecked")
- PCollection<Integer> collection = Mockito.mock(PCollection.class);
+ PCollection<Integer> collection = mock(PCollection.class);
@SuppressWarnings("unchecked")
- PCollection<String> output = Mockito.mock(PCollection.class);
- Mockito.when(delegate.expand(collection)).thenReturn(output);
+ PCollection<String> output = mock(PCollection.class);
+ when(delegate.expand(collection)).thenReturn(output);
PCollection<String> result = forwarding.expand(collection);
assertThat(result, equalTo(output));
}
@@ -74,15 +82,23 @@ public class ForwardingPTransformTest {
@Test
public void getNameDelegates() {
String name = "My_forwardingptransform-name;for!thisTest";
- Mockito.when(delegate.getName()).thenReturn(name);
+ when(delegate.getName()).thenReturn(name);
assertThat(forwarding.getName(), equalTo(name));
}
@Test
+ public void getAdditionalInputsDelegates() {
+ Map<TupleTag<?>, PValue> additionalInputs = ImmutableMap.<TupleTag<?>, PValue>of(
+ new TupleTag<Object>("test_tag"), Pipeline.create().apply(Create.of("1")));
+ when(delegate.getAdditionalInputs()).thenReturn(additionalInputs);
+ assertThat(forwarding.getAdditionalInputs(), equalTo(additionalInputs));
+ }
+
+ @Test
public void validateDelegates() {
@SuppressWarnings("unchecked")
- PipelineOptions options = Mockito.mock(PipelineOptions.class);
- Mockito.doThrow(RuntimeException.class).when(delegate).validate(options);
+ PipelineOptions options = mock(PipelineOptions.class);
+ doThrow(RuntimeException.class).when(delegate).validate(options);
thrown.expect(RuntimeException.class);
forwarding.validate(options);
@@ -104,16 +120,16 @@ public class ForwardingPTransformTest {
PCollection.IsBounded.BOUNDED,
null /* coder */);
@SuppressWarnings("unchecked")
- Coder<String> outputCoder = Mockito.mock(Coder.class);
+ Coder<String> outputCoder = mock(Coder.class);
- Mockito.when(delegate.expand(input)).thenReturn(output);
- Mockito.when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
+ when(delegate.expand(input)).thenReturn(output);
+ when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
assertThat(forwarding.expand(input).getCoder(), equalTo(outputCoder));
}
@Test
public void populateDisplayDataDelegates() {
- Mockito.doThrow(RuntimeException.class)
+ doThrow(RuntimeException.class)
.when(delegate)
.populateDisplayData(any(DisplayData.Builder.class));
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].