You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/03/22 00:27:18 UTC
[beam] branch master updated: [BEAM-6865] share non-Flink-specific
pipeline helper utils
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1f1660f [BEAM-6865] share non-Flink-specific pipeline helper utils
new 85df199 Merge pull request #8085 from ibzib/shared-utils
1f1660f is described below
commit 1f1660f0c97e65bb91e2c5aa7694b6d7b9215bbd
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Tue Mar 19 15:34:48 2019 -0700
[BEAM-6865] share non-Flink-specific pipeline helper utils
---
.../flink/FlinkBatchPortablePipelineTranslator.java | 6 +++---
.../FlinkStreamingPortablePipelineTranslator.java | 9 ++++-----
.../translation/PipelineTranslatorUtils.java} | 6 +++---
.../fnexecution/translation/package-info.java | 20 ++++++++++++++++++++
.../translation/PipelineTranslatorUtilsTest.java} | 11 ++++-------
5 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
index 2f49e3d..29ac2b0 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchPortablePipelineTranslator.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.flink;
import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.createOutputMap;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.google.auto.service.AutoService;
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
index 1c57824..df1c875 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java
@@ -19,8 +19,9 @@ package org.apache.beam.runners.flink;
import static java.lang.String.format;
import static org.apache.beam.runners.core.construction.ExecutableStageTranslation.generateNameFromStagePayload;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.getWindowingStrategy;
-import static org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils.instantiateCoder;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.createOutputMap;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.getWindowingStrategy;
+import static org.apache.beam.runners.fnexecution.translation.PipelineTranslatorUtils.instantiateCoder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -51,7 +52,6 @@ import org.apache.beam.runners.core.construction.graph.QueryablePipeline;
import org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageContext;
import org.apache.beam.runners.flink.translation.functions.ImpulseSourceFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.ExecutableStageDoFnOperator;
import org.apache.beam.runners.flink.translation.wrappers.streaming.KvToByteBufferKeySelector;
@@ -675,8 +675,7 @@ public class FlinkStreamingPortablePipelineTranslator
outputs.isEmpty() ? null : new TupleTag(outputs.keySet().iterator().next());
// associate output tags with ids, output manager uses these Integer ids to serialize state
- BiMap<String, Integer> outputIndexMap =
- FlinkPipelineTranslatorUtils.createOutputMap(outputs.keySet());
+ BiMap<String, Integer> outputIndexMap = createOutputMap(outputs.keySet());
Map<String, Coder<WindowedValue<?>>> outputCoders = Maps.newHashMap();
Map<TupleTag<?>, Integer> tagsToIds = Maps.newHashMap();
Map<String, TupleTag<?>> collectionIdToTupleTag = Maps.newHashMap();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
similarity index 95%
rename from runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
index b1943a9..c39b8b7 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/FlinkPipelineTranslatorUtils.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtils.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.flink.translation.utils;
+package org.apache.beam.runners.fnexecution.translation;
import java.io.IOException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
@@ -32,8 +32,8 @@ import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableBiM
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
/** Utilities for pipeline translation. */
-public final class FlinkPipelineTranslatorUtils {
- private FlinkPipelineTranslatorUtils() {}
+public final class PipelineTranslatorUtils {
+ private PipelineTranslatorUtils() {}
/** Creates a mapping from PCollection id to output tag integer. */
public static BiMap<String, Integer> createOutputMap(Iterable<String> localOutputs) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java
new file mode 100644
index 0000000..7d000dd
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/translation/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Shared utilities for a Beam runner to translate portable pipelines. */
+package org.apache.beam.runners.fnexecution.translation;
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
similarity index 78%
rename from runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
rename to runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
index 1751982..a37b12b 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslatorUtilsTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/translation/PipelineTranslatorUtilsTest.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.runners.flink.translation;
+package org.apache.beam.runners.fnexecution.translation;
import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
@@ -23,20 +23,17 @@ import static org.junit.Assert.assertThat;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.BiMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.junit.Test;
-/**
- * Tests for {@link org.apache.beam.runners.flink.translation.utils.FlinkPipelineTranslatorUtils}.
- */
-public class FlinkPipelineTranslatorUtilsTest {
+/** Tests for {@link PipelineTranslatorUtils}. */
+public class PipelineTranslatorUtilsTest {
@Test
public void testOutputMapCreation() {
List<String> outputs = Arrays.asList("output1", "output2", "output3");
- BiMap<String, Integer> outputMap = FlinkPipelineTranslatorUtils.createOutputMap(outputs);
+ BiMap<String, Integer> outputMap = PipelineTranslatorUtils.createOutputMap(outputs);
Map<Object, Object> expected =
ImmutableMap.builder().put("output1", 0).put("output2", 1).put("output3", 2).build();
assertThat(outputMap, is(expected));