You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:50 UTC

[25/50] [abbrv] beam git commit: Update against master changes

Update against master changes


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9aac967
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9aac967
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9aac967

Branch: refs/heads/master
Commit: c9aac967bccba5c3e1a0a3e4d84a8def1bfa2581
Parents: 9e6c906
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jun 5 12:38:30 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 5 16:29:25 2017 +0800

----------------------------------------------------------------------
 runners/gearpump/pom.xml                        |  1 +
 .../gearpump/GearpumpPipelineTranslator.java    |  9 +--
 .../beam/runners/gearpump/GearpumpRunner.java   |  7 +-
 .../CreatePCollectionViewTranslator.java        | 45 ------------
 .../ParDoSingleOutputTranslator.java            | 75 --------------------
 .../translators/TranslationContext.java         |  5 +-
 .../translators/utils/DoFnRunnerFactory.java    |  6 +-
 .../translators/utils/NoOpStepContext.java      | 37 +---------
 .../translators/utils/TranslatorUtils.java      |  1 -
 .../CreatePCollectionViewTranslatorTest.java    | 55 --------------
 10 files changed, 17 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/pom.xml
----------------------------------------------------------------------
diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml
index d4dade1..beb7753 100644
--- a/runners/gearpump/pom.xml
+++ b/runners/gearpump/pom.xml
@@ -86,6 +86,7 @@
                       ]
                     </beamTestPipelineOptions>
                   </systemPropertyVariables>
+                  <threadCount>4</threadCount>
                 </configuration>
               </execution>
             </executions>

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
index dc4592c..daf65d9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java
@@ -30,11 +30,9 @@ import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator;
-import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator;
 import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator;
 import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator;
 import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator;
-import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator;
 import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.gearpump.translators.TransformTranslator;
@@ -73,7 +71,7 @@ import org.slf4j.LoggerFactory;
  * into Gearpump {@link Graph}.
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
-public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
+public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Defaults {
 
   private static final Logger LOG = LoggerFactory.getLogger(
       GearpumpPipelineTranslator.class);
@@ -89,7 +87,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
 
   static {
     // register TransformTranslators
-    registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator());
     registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
     registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
@@ -97,8 +94,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
         new FlattenPCollectionsTranslator());
     registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator());
     registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator());
-    registerTransformTranslator(View.CreatePCollectionView.class,
-        new CreatePCollectionViewTranslator());
     registerTransformTranslator(CreateGearpumpPCollectionView.class,
         new CreateGearpumpPCollectionViewTranslator<>());
   }
@@ -155,7 +150,7 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor {
       throw new IllegalStateException(
           "no translator registered for " + transform);
     }
-    translationContext.setCurrentTransform(node);
+    translationContext.setCurrentTransform(node, getPipeline());
     translator.translate(transform, translationContext);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
index 6df3f2d..30b1935 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java
@@ -95,19 +95,22 @@ public class GearpumpRunner extends PipelineRunner<GearpumpPipelineResult> {
    */
   private Config registerSerializers(Config config, Map<String, String> userSerializers) {
     Map<String, String> serializers = new HashMap<>();
+    serializers.put("org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow", "");
     serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", "");
+    serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow", "");
+    serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInMultipleWindows", "");
     serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", "");
     serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", "");
     serializers.put("org.joda.time.Instant", "");
     serializers.put("org.apache.beam.sdk.values.KV", "");
     serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", "");
     serializers.put("org.apache.beam.sdk.values.TimestampedValue", "");
+
     if (userSerializers != null && !userSerializers.isEmpty()) {
       serializers.putAll(userSerializers);
     }
+
     return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
   }
 
-
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
deleted file mode 100644
index da55d70..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import java.util.List;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * View.CreatePCollectionView bridges input stream to down stream
- * transforms.
- */
-public class CreatePCollectionViewTranslator<ElemT, ViewT> implements
-    TransformTranslator<View.CreatePCollectionView<ElemT, ViewT>> {
-
-  private static final long serialVersionUID = -2394386873317515748L;
-
-  @Override
-  public void translate(View.CreatePCollectionView<ElemT, ViewT> transform,
-                        TranslationContext context) {
-    JavaStream<WindowedValue<List<ElemT>>> inputStream =
-        context.getInputStream(context.getInput());
-    PCollectionView<ViewT> view = (PCollectionView<ViewT>) context.getOutput();
-    context.setOutputStream(view, inputStream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
deleted file mode 100644
index 6b0e610..0000000
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction;
-import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-
-/**
- * {@link ParDo.SingleOutput} is translated to Gearpump flatMap function
- * with {@link DoFn} wrapped in {@link DoFnFunction}.
- */
-public class ParDoSingleOutputTranslator<InputT, OutputT> implements
-    TransformTranslator<ParDo.SingleOutput<InputT, OutputT>> {
-
-  private static final long serialVersionUID = -3413205558160983784L;
-  private final TupleTag<OutputT> mainOutput = new TupleTag<>();
-  private final List<TupleTag<?>> sideOutputs = TupleTagList.empty().getAll();
-
-  @Override
-  public void translate(ParDo.SingleOutput<InputT, OutputT> transform, TranslationContext context) {
-    DoFn<InputT, OutputT> doFn = transform.getFn();
-    PCollection<OutputT> output = (PCollection<OutputT>) context.getOutput();
-    WindowingStrategy<?, ?> windowingStrategy = output.getWindowingStrategy();
-
-    Collection<PCollectionView<?>> sideInputs = transform.getSideInputs();
-    Map<String, PCollectionView<?>> tagsToSideInputs =
-        TranslatorUtils.getTagsToSideInputs(sideInputs);
-    JavaStream<WindowedValue<InputT>> inputStream = context.getInputStream(
-        context.getInput());
-    JavaStream<TranslatorUtils.RawUnionValue> unionStream =
-        TranslatorUtils.withSideInputStream(context,
-        inputStream, tagsToSideInputs);
-
-    DoFnFunction<InputT, OutputT> doFnFunction = new DoFnFunction<>(context.getPipelineOptions(),
-        doFn, windowingStrategy, sideInputs, tagsToSideInputs,
-        mainOutput, sideOutputs);
-
-    JavaStream<WindowedValue<OutputT>> outputStream =
-        TranslatorUtils.toList(unionStream)
-            .flatMap(doFnFunction, transform.getName())
-            .map(new TranslatorUtils.FromRawUnionValue<OutputT>(), "from_RawUnionValue");
-
-    context.setOutputStream(context.getOutput(), outputStream);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
index c3db044..4090354 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/TranslationContext.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.values.PValue;
@@ -52,8 +53,8 @@ public class TranslationContext {
     this.pipelineOptions = pipelineOptions;
   }
 
-  public void setCurrentTransform(TransformHierarchy.Node treeNode) {
-    this.currentTransform = treeNode.toAppliedPTransform();
+  public void setCurrentTransform(TransformHierarchy.Node treeNode, Pipeline pipeline) {
+    this.currentTransform = treeNode.toAppliedPTransform(pipeline);
   }
 
   public GearpumpPipelineOptions getPipelineOptions() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
index d38f11b..35cf2b5 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/DoFnRunnerFactory.java
@@ -24,11 +24,11 @@ import java.util.List;
 
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.ReadyCheckingSideInputReader;
 import org.apache.beam.runners.core.SimpleDoFnRunner;
 import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.gearpump.GearpumpPipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -48,7 +48,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
   private final DoFnRunners.OutputManager outputManager;
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> sideOutputTags;
-  private final ExecutionContext.StepContext stepContext;
+  private final StepContext stepContext;
   private final WindowingStrategy<?, ?> windowingStrategy;
 
   public DoFnRunnerFactory(
@@ -58,7 +58,7 @@ public class DoFnRunnerFactory<InputT, OutputT> implements Serializable {
       DoFnRunners.OutputManager outputManager,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
-      ExecutionContext.StepContext stepContext,
+      StepContext stepContext,
       WindowingStrategy<?, ?> windowingStrategy) {
     this.fn = doFn;
     this.options = pipelineOptions;

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
index 64fd615..b795ed9 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/NoOpStepContext.java
@@ -18,47 +18,16 @@
 
 package org.apache.beam.runners.gearpump.translators.utils;
 
-import java.io.IOException;
 import java.io.Serializable;
 
-import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StepContext;
 import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
 
 /**
- * serializable {@link ExecutionContext.StepContext} that basically does nothing.
+ * serializable {@link StepContext} that basically does nothing.
  */
-public class NoOpStepContext implements ExecutionContext.StepContext, Serializable {
-
-  @Override
-  public String getStepName() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String getTransformName() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void noteOutput(WindowedValue<?> output) {
-  }
-
-  @Override
-  public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) {
-
-  }
-
-  @Override
-  public <T, W extends BoundedWindow> void writePCollectionViewData(TupleTag<?> tag,
-      Iterable<WindowedValue<T>> data,
-      Coder<Iterable<WindowedValue<T>>> dataCoder, W window, Coder<W> windowCoder) throws
-      IOException {
-  }
+public class NoOpStepContext implements StepContext, Serializable {
 
   @Override
   public StateInternals stateInternals() {

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
index b8f0ccb..999afae 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java
@@ -74,7 +74,6 @@ public class TranslatorUtils {
 
     for (Map.Entry<String, PCollectionView<?>> tagToSideInput: tagsToSideInputs.entrySet()) {
       // actually JavaStream<WindowedValue<List<?>>>
-      // check CreatePCollectionViewTranslator
       JavaStream<WindowedValue<Object>> sideInputStream = context.getInputStream(
           tagToSideInput.getValue());
       mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/c9aac967/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
deleted file mode 100644
index 42ff14e..0000000
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.gearpump.translators;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.transforms.View;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
-import org.junit.Test;
-
-/** Tests for {@link CreatePCollectionViewTranslator}. */
-public class CreatePCollectionViewTranslatorTest {
-
-  @Test
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public void testTranslate() {
-    CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator();
-    View.CreatePCollectionView<String, Iterable<String>> createView =
-        mock(View.CreatePCollectionView.class);
-
-    JavaStream javaStream = mock(JavaStream.class);
-    TranslationContext translationContext = mock(TranslationContext.class);
-
-    PValue mockInput = mock(PValue.class);
-    when(translationContext.getInput()).thenReturn(mockInput);
-    when(translationContext.getInputStream(mockInput)).thenReturn(javaStream);
-
-    PCollectionView view = mock(PCollectionView.class);
-    when(translationContext.getOutput()).thenReturn(view);
-
-    translator.translate(createView, translationContext);
-    verify(translationContext, times(1)).setOutputStream(view, javaStream);
-  }
-}