You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/03/22 00:27:18 UTC

[beam] branch master updated: [BEAM-6865] share non-Flink-specific pipeline helper utils

This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f1660f  [BEAM-6865] share non-Flink-specific pipeline helper utils
     new 85df199  Merge pull request #8085 from ibzib/shared-utils
1f1660f is described below

commit 1f1660f0c97e65bb91e2c5aa7694b6d7b9215bbd
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Tue Mar 19 15:34:48 2019 -0700

    [BEAM-6865] share non-Flink-specific pipeline helper utils
---
 .../flink/FlinkBatchPortablePipelineTranslator.java  |  6 +++---
 .../FlinkStreamingPortablePipelineTranslator.java    |  9 ++++-----
 .../translation/PipelineTranslatorUtils.java}        |  6 +++---
 .../fnexecution/translation/package-info.java        | 20 ++++++++++++++++++++
 .../translation/PipelineTranslatorUtilsTest.java}    | 11 ++++-------
 5 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 2f49e3d..29ac2b0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -18,9 +18,9 @@
 package org.apache.beam.runners.flink;
 
 import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.auto.service.AutoService;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 1c57824..df1c875 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -19,8 +19,9 @@ package org.apache.beam.runners.flink;
 
 import static java.lang.String.format;
 import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,7 +52,6 @@ import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
 import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
 import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
@@ -675,8 +675,7 @@ public class FlinkStreamingPortablePipelineTranslator
         outputs.isEmpty() ? null : new TupleTag(outputs.keySet().iterator().next());
 
     // associate output tags with ids, output manager uses these Integer ids to serialize state
-    BiMap<String, Integer> outputIndexMap =
-        FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+    BiMap<String, Integer> outputIndexMap = createOutputMap(outputs.keySet());
     Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap();
     Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
     Map<String, TupleTag<?>> collectionIdToTupleTag = Maps.newHashMap();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
similarity index 95%
rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index b1943a9..c39b8b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.translation.utils;
+package org.apache.beam.runners.fnexecution.translation;
 
 import java.io.IOException;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -32,8 +32,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiM
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
 
 /** Utilities for pipeline translation. */
-public final class FlinkPipelineTranslatorUtils {
-  private FlinkPipelineTranslatorUtils() {}
+public final class PipelineTranslatorUtils {
+  private PipelineTranslatorUtils() {}
 
   /** Creates a mapping from PCollection id to output tag integer. */
   public static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java
new file mode 100644
index 0000000..7d000dd
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Shared utilities for a Beam runner to translate portable pipelines. */
+package org.apache.beam.runners.fnexecution.translation;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
similarity index 78%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
rename to runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
index 1751982..a37b12b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.flink.translation;
+package org.apache.beam.runners.fnexecution.translation;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
@@ -23,20 +23,17 @@ import static org.junit.Assert.assertThat;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
-/**
- * Tests for {@link org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils}.
- */
-public class FlinkPipelineTranslatorUtilsTest {
+/** Tests for {@link PipelineTranslatorUtils}. */
+public class PipelineTranslatorUtilsTest {
 
   @Test
   public void testOutputMapCreation() {
     List<String> outputs = Arrays.asList("output1", "output2", "output3");
-    BiMap<String, Integer> outputMap = FlinkPipelineTranslatorUtils.createOutputMap(outputs);
+    BiMap<String, Integer> outputMap = PipelineTranslatorUtils.createOutputMap(outputs);
     Map<Object, Object> expected =
         ImmutableMap.builder().put("output1", 0).put("output2", 1).put("output3", 2).build();
     assertThat(outputMap, is(expected));