You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/01/18 17:52:23 UTC

[beam] branch master updated: [BEAM-2863] Fix translation of side inputs due to missing method.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f12558  [BEAM-2863] Fix translation of side inputs due to missing method.
6f12558 is described below

commit 6f12558a7403e1d834719fc8562be386353c6f7b
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Jan 18 09:17:56 2018 -0800

    [BEAM-2863] Fix translation of side inputs due to missing method.
---
 .../core/construction/ForwardingPTransform.java    |  8 +++++
 .../construction/ForwardingPTransformTest.java     | 38 +++++++++++++++-------
 2 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index ccf41f3..532c1f1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import java.util.Map;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -24,6 +25,8 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * A base class for implementing {@link PTransform} overrides, which behave identically to the
@@ -49,6 +52,11 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
   }
 
   @Override
+  public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+    return delegate().getAdditionalInputs();
+  }
+
+  @Override
   public void validate(PipelineOptions options) {
     delegate().validate(options);
   }
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
index 4741b6b..979a978 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ForwardingPTransformTest.java
@@ -20,12 +20,21 @@ package org.apache.beam.runners.core.construction;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ImmutableMap;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.junit.Before;
 import org.junit.Rule;
@@ -34,7 +43,6 @@ import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
 /**
@@ -63,10 +71,10 @@ public class ForwardingPTransformTest {
   @Test
   public void applyDelegates() {
     @SuppressWarnings("unchecked")
-    PCollection<Integer> collection = Mockito.mock(PCollection.class);
+    PCollection<Integer> collection = mock(PCollection.class);
     @SuppressWarnings("unchecked")
-    PCollection<String> output = Mockito.mock(PCollection.class);
-    Mockito.when(delegate.expand(collection)).thenReturn(output);
+    PCollection<String> output = mock(PCollection.class);
+    when(delegate.expand(collection)).thenReturn(output);
     PCollection<String> result = forwarding.expand(collection);
     assertThat(result, equalTo(output));
   }
@@ -74,15 +82,23 @@ public class ForwardingPTransformTest {
   @Test
   public void getNameDelegates() {
     String name = "My_forwardingptransform-name;for!thisTest";
-    Mockito.when(delegate.getName()).thenReturn(name);
+    when(delegate.getName()).thenReturn(name);
     assertThat(forwarding.getName(), equalTo(name));
   }
 
   @Test
+  public void getAdditionalInputsDelegates() {
+    Map<TupleTag<?>, PValue> additionalInputs = ImmutableMap.<TupleTag<?>, PValue>of(
+        new TupleTag<Object>("test_tag"), Pipeline.create().apply(Create.of("1")));
+    when(delegate.getAdditionalInputs()).thenReturn(additionalInputs);
+    assertThat(forwarding.getAdditionalInputs(), equalTo(additionalInputs));
+  }
+
+  @Test
   public void validateDelegates() {
     @SuppressWarnings("unchecked")
-    PipelineOptions options = Mockito.mock(PipelineOptions.class);
-    Mockito.doThrow(RuntimeException.class).when(delegate).validate(options);
+    PipelineOptions options = mock(PipelineOptions.class);
+    doThrow(RuntimeException.class).when(delegate).validate(options);
 
     thrown.expect(RuntimeException.class);
     forwarding.validate(options);
@@ -104,16 +120,16 @@ public class ForwardingPTransformTest {
         PCollection.IsBounded.BOUNDED,
         null /* coder */);
     @SuppressWarnings("unchecked")
-    Coder<String> outputCoder = Mockito.mock(Coder.class);
+    Coder<String> outputCoder = mock(Coder.class);
 
-    Mockito.when(delegate.expand(input)).thenReturn(output);
-    Mockito.when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
+    when(delegate.expand(input)).thenReturn(output);
+    when(delegate.getDefaultOutputCoder(input, output)).thenReturn(outputCoder);
     assertThat(forwarding.expand(input).getCoder(), equalTo(outputCoder));
   }
 
   @Test
   public void populateDisplayDataDelegates() {
-    Mockito.doThrow(RuntimeException.class)
+    doThrow(RuntimeException.class)
         .when(delegate)
         .populateDisplayData(any(DisplayData.Builder.class));
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].