You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:50 UTC
[25/50] [abbrv] beam git commit: Update against master changes
Update against master changes
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9aac967
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9aac967
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9aac967
Branch: refs/heads/master
Commit: c9aac967bccba5c3e1a0a3e4d84a8def1bfa2581
Parents: 9e6c906
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jun 5 12:38:30 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 5 16:29:25 2017 +0800
----------------------------------------------------------------------
runners/gearpump/pom.xml | 1 +
.../gearpump/GearpumpPipelineTranslator.java | 9 +--
.../beam/runners/gearpump/GearpumpRunner.java | 7 +-
.../CreatePCollectionViewTranslator.java | 45 ------------
.../ParDoSingleOutputTranslator.java | 75 --------------------
.../translators/TranslationContext.java | 5 +-
.../translators/utils/DoFnRunnerFactory.java | 6 +-
.../translators/utils/NoOpStepContext.java | 37 +---------
.../translators/utils/TranslatorUtils.java | 1 -
.../CreatePCollectionViewTranslatorTest.java | 55 --------------
10 files changed, 17 insertions(+), 224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index d4dade1..beb7753 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -86,6 +86,7 @@
]
</beamTestPipelineOptions>
</systemPropertyVariables>
+ <threadCount>4</threadCount>
</configuration>
</execution>
</executions>
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index dc4592c..daf65d9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -30,11 +30,9 @@ import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator;
import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
@@ -73,7 +71,7 @@ import org.slf4j.LoggerFactory;
* into Gearpump {@link Graph}.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
private static final Logger LOG = LoggerFactory.getLogger(
GearpumpPipelineTranslator.class);
@@ -89,7 +87,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
static {
// register TransformTranslators
- registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator());
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
@@ -97,8 +94,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
new FlattenPCollectionsTranslator());
registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
- registerTransformTranslator(View.CreatePCollectionView.class,
- new CreatePCollectionViewTranslator());
registerTransformTranslator(CreateGearpumpPCollectionView.class,
new CreateGearpumpPCollectionViewTranslator<>());
}
@@ -155,7 +150,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
throw new IllegalStateException(
"no translator registered for " + transform);
}
- translationContext.setCurrentTransform(node);
+ translationContext.setCurrentTransform(node, getPipeline());
translator.translate(transform, translationContext);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 6df3f2d..30b1935 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -95,19 +95,22 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
*/
private Config registerSerializers(Config config, Map<String, String> userSerializers) {
Map<String, String> serializers = new HashMap<>();
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow", "");
serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow", "");
+ serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInMultipleWindows", "");
serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
serializers.put("org.joda.time.Instant", "");
serializers.put("org.apache.beam.sdk.values.KV", "");
serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+
if (userSerializers != null && !userSerializers.isEmpty()) {
serializers.putAll(userSerializers);
}
+
return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
}
-
-
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
deleted file mode 100644
index da55d70..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * View.CreatePCollectionView bridges input stream to down stream
- * transforms.
- */
-public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
- TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
-
- private static final long serialVersionUID = -2394386873317515748L;
-
- @Override
- public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
- TranslationContext context) {
- JavaStream<WindowedValue<List<ElemT>>> inputStream =
- context.getInputStream(context.getInput());
- PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
- context.setOutputStream(view, inputStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
deleted file mode 100644
index 6b0e610..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
-import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * {@link ParDo.SingleOutput} is translated to Gearpump flatMap function
- * with {@link DoFn} wrapped in {@link DoFnFunction}.
- */
-public class ParDoSingleOutputTranslator<InputT, OutputT> implements
- TransformTranslator<ParDo.SingleOutput<InputT, OutputT>> {
-
- private static final long serialVersionUID = -3413205558160983784L;
- private final TupleTag<OutputT> mainOutput = new TupleTag<>();
- private final List<TupleTag<?>> sideOutputs = TupleTagList.empty().getAll();
-
- @Override
- public void translate(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
- DoFn<InputT, OutputT> doFn = transform.getFn();
- PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
- WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
-
- Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
- Map<String, PCollectionView<?>> tagsToSideInputs =
- TranslatorUtils.getTagsToSideInputs(sideInputs);
- JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
- context.getInput());
- JavaStream<TranslatorUtils.RawUnionValue> unionStream =
- TranslatorUtils.withSideInputStream(context,
- inputStream, tagsToSideInputs);
-
- DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
- doFn, windowingStrategy, sideInputs, tagsToSideInputs,
- mainOutput, sideOutputs);
-
- JavaStream<WindowedValue<OutputT>> outputStream =
- TranslatorUtils.toList(unionStream)
- .flatMap(doFnFunction, transform.getName())
- .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
-
- context.setOutputStream(context.getOutput(), outputStream);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index c3db044..4090354 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.values.PValue;
@@ -52,8 +53,8 @@ public class TranslationContext {
this.pipelineOptions = pipelineOptions;
}
- public void setCurrentTransform(TransformHierarchy.Node treeNode) {
- this.currentTransform = treeNode.toAppliedPTransform();
+ public void setCurrentTransform(TransformHierarchy.Node treeNode, Pipeline pipeline) {
+ this.currentTransform = treeNode.toAppliedPTransform(pipeline);
}
public GearpumpPipelineOptions getPipelineOptions() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index d38f11b..35cf2b5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -24,11 +24,11 @@ import java.util.List;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
import org.apache.beam.runners.core.SimpleDoFnRunner;
import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
@@ -48,7 +48,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
private final DoFnRunners.OutputManager outputManager;
private final TupleTag<OutputT> mainOutputTag;
private final List<TupleTag<?>> sideOutputTags;
- private final ExecutionContext.StepContext stepContext;
+ private final StepContext stepContext;
private final WindowingStrategy<?, ?> windowingStrategy;
public DoFnRunnerFactory(
@@ -58,7 +58,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
DoFnRunners.OutputManager outputManager,
TupleTag<OutputT> mainOutputTag,
List<TupleTag<?>> sideOutputTags,
- ExecutionContext.StepContext stepContext,
+ StepContext stepContext,
WindowingStrategy<?, ?> windowingStrategy) {
this.fn = doFn;
this.options = pipelineOptions;
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 64fd615..b795ed9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -18,47 +18,16 @@
package org.apache.beam.runners.gearpump.translators.utils;
-import java.io.IOException;
import java.io.Serializable;
-import org.apache.beam.runners.core.ExecutionContext;
import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
/**
- * serializable {@link ExecutionContext.StepContext} that basically does nothing.
+ * serializable {@link StepContext} that basically does nothing.
*/
-public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
-
- @Override
- public String getStepName() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public String getTransformName() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void noteOutput(WindowedValue<?> output) {
- }
-
- @Override
- public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-
- }
-
- @Override
- public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
- Iterable<WindowedValue<T>> data,
- Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
- IOException {
- }
+public class NoOpStepContext implements StepContext, Serializable {
@Override
public StateInternals stateInternals() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b8f0ccb..999afae 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -74,7 +74,6 @@ public class TranslatorUtils {
for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) {
// actually JavaStream<WindowedValue<List<?>>>
- // check CreatePCollectionViewTranslator
JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream(
tagToSideInput.getValue());
mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(
http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
deleted file mode 100644
index 42ff14e..0000000
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.junit.Test;
-
-/** Tests for {@link CreatePCollectionViewTranslator}. */
-public class CreatePCollectionViewTranslatorTest {
-
- @Test
- @SuppressWarnings({"rawtypes", "unchecked"})
- public void testTranslate() {
- CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator();
- View.CreatePCollectionView<String, Iterable<String>> createView =
- mock(View.CreatePCollectionView.class);
-
- JavaStream javaStream = mock(JavaStream.class);
- TranslationContext translationContext = mock(TranslationContext.class);
-
- PValue mockInput = mock(PValue.class);
- when(translationContext.getInput()).thenReturn(mockInput);
- when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
-
- PCollectionView view = mock(PCollectionView.class);
- when(translationContext.getOutput()).thenReturn(view);
-
- translator.translate(createView, translationContext);
- verify(translationContext, times(1)).setOutputStream(view, javaStream);
- }
-}