You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/19 16:24:28 UTC

[beam] branch master updated: [BEAM-9540] Rename beam:source:runner:0.1/beam:sink:runner:0.1 to beam:runner:source:v1/beam:runner:sink:v1

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f4c5f6  [BEAM-9540] Rename beam:source:runner:0.1/beam:sink:runner:0.1 to beam:runner:source:v1/beam:runner:sink:v1
     new c7fbfa8  Merge pull request #11157 from lukecwik/proto
9f4c5f6 is described below

commit 9f4c5f66bb3979aeeebd648f76cbfaca301a1753
Author: Luke Cwik <lc...@google.com>
AuthorDate: Wed Mar 18 09:15:54 2020 -0700

    [BEAM-9540] Rename beam:source:runner:0.1/beam:sink:runner:0.1 to beam:runner:source:v1/beam:runner:sink:v1
---
 .../dataflow/worker/graph/CreateExecutableStageNodeFunction.java      | 4 ++--
 .../beam/runners/dataflow/worker/graph/RegisterNodeFunction.java      | 4 ++--
 sdks/go/pkg/beam/core/runtime/exec/translate.go                       | 4 ++--
 sdks/go/pkg/beam/runners/session/session.go                           | 4 ++--
 .../src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortRead.java | 2 +-
 .../main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortWrite.java    | 2 +-
 .../java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java | 2 +-
 .../org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java  | 4 ++--
 sdks/python/apache_beam/runners/worker/bundle_processor.py            | 4 ++--
 9 files changed, 15 insertions(+), 15 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
index 1b888c6..8f7026e 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/CreateExecutableStageNodeFunction.java
@@ -94,9 +94,9 @@ import org.joda.time.Duration;
  */
 public class CreateExecutableStageNodeFunction
     implements Function<MutableNetwork<Node, Edge>, Node> {
-  private static final String DATA_INPUT_URN = "beam:source:runner:0.1";
+  private static final String DATA_INPUT_URN = "beam:runner:source:v1";
 
-  private static final String DATA_OUTPUT_URN = "beam:sink:runner:0.1";
+  private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
   private static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
 
   public static final String COMBINE_PER_KEY_URN =
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
index 0a25cf1..42217c8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/graph/RegisterNodeFunction.java
@@ -94,9 +94,9 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.graph.Network;
  */
 public class RegisterNodeFunction implements Function<MutableNetwork<Node, Edge>, Node> {
   /** Must match declared fields within {@code ProcessBundleHandler}. */
-  private static final String DATA_INPUT_URN = "beam:source:runner:0.1";
+  private static final String DATA_INPUT_URN = "beam:runner:source:v1";
 
-  private static final String DATA_OUTPUT_URN = "beam:sink:runner:0.1";
+  private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
   private static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
 
   public static final String COMBINE_PER_KEY_URN =
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index 8349cf3..d5ab8df 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -37,8 +37,8 @@ import (
 
 // TODO(lostluck): 2018/05/28 Extract these from the canonical enums in beam_runner_api.proto
 const (
-	urnDataSource           = "beam:source:runner:0.1"
-	urnDataSink             = "beam:sink:runner:0.1"
+	urnDataSource           = "beam:runner:source:v1"
+	urnDataSink             = "beam:runner:sink:v1"
 	urnPerKeyCombinePre     = "beam:transform:combine_per_key_precombine:v1"
 	urnPerKeyCombineMerge   = "beam:transform:combine_per_key_merge_accumulators:v1"
 	urnPerKeyCombineExtract = "beam:transform:combine_per_key_extract_outputs:v1"
diff --git a/sdks/go/pkg/beam/runners/session/session.go b/sdks/go/pkg/beam/runners/session/session.go
index 8725bc1..d814882 100644
--- a/sdks/go/pkg/beam/runners/session/session.go
+++ b/sdks/go/pkg/beam/runners/session/session.go
@@ -180,11 +180,11 @@ func (c *controlServer) handleEntry(msg *session.Entry) {
 			for _, desc := range rr.GetProcessBundleDescriptor() {
 				for beamPort, t := range desc.GetTransforms() {
 					s := t.GetSpec()
-					if s.GetUrn() == "beam:source:runner:0.1" {
+					if s.GetUrn() == "beam:runner:source:v1" {
 						tcpPort := extractPortSpec(s)
 						c.establishDataChannel(beamPort, tcpPort)
 					}
-					if s.GetUrn() == "beam:sink:runner:0.1" {
+					if s.GetUrn() == "beam:runner:sink:v1" {
 						tcpPort := extractPortSpec(s)
 						c.establishDataChannel(beamPort, tcpPort)
 					}
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortRead.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortRead.java
index f06599e..1e205b1 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortRead.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortRead.java
@@ -32,7 +32,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterable
  */
 @AutoValue
 public abstract class RemoteGrpcPortRead {
-  public static final String URN = "beam:source:runner:0.1";
+  public static final String URN = "beam:runner:source:v1";
   private static final String LOCAL_OUTPUT_ID = "local_output";
 
   public static RemoteGrpcPortRead readFromPort(RemoteGrpcPort port, String outputPCollectionId) {
diff --git a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortWrite.java b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortWrite.java
index 42fd798..2b73659 100644
--- a/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortWrite.java
+++ b/sdks/java/fn-execution/src/main/java/org/apache/beam/sdk/fn/data/RemoteGrpcPortWrite.java
@@ -33,7 +33,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterable
  */
 @AutoValue
 public abstract class RemoteGrpcPortWrite {
-  public static final String URN = "beam:sink:runner:0.1";
+  public static final String URN = "beam:runner:sink:v1";
   private static final String LOCAL_INPUT_ID = "local_input";
 
   /**
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 48e2450..382c882 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -102,7 +102,7 @@ import org.slf4j.LoggerFactory;
 public class ProcessBundleHandler {
 
   // TODO: What should the initial set of URNs be?
-  private static final String DATA_INPUT_URN = "beam:source:runner:0.1";
+  private static final String DATA_INPUT_URN = "beam:runner:source:v1";
   public static final String JAVA_SOURCE_URN = "beam:source:java:0.1";
 
   private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index f9aeb6b..e216aff 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -106,8 +106,8 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link ProcessBundleHandler}. */
 @RunWith(JUnit4.class)
 public class ProcessBundleHandlerTest {
-  private static final String DATA_INPUT_URN = "beam:source:runner:0.1";
-  private static final String DATA_OUTPUT_URN = "beam:sink:runner:0.1";
+  private static final String DATA_INPUT_URN = "beam:runner:source:v1";
+  private static final String DATA_OUTPUT_URN = "beam:runner:sink:v1";
 
   @Rule public ExpectedException thrown = ExpectedException.none();
 
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 7ec045c..ccdae23 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -98,8 +98,8 @@ ConstructorFn = Callable[[
                          operations.Operation]
 OperationT = TypeVar('OperationT', bound=operations.Operation)
 
-DATA_INPUT_URN = 'beam:source:runner:0.1'
-DATA_OUTPUT_URN = 'beam:sink:runner:0.1'
+DATA_INPUT_URN = 'beam:runner:source:v1'
+DATA_OUTPUT_URN = 'beam:runner:sink:v1'
 IDENTITY_DOFN_URN = 'beam:dofn:identity:0.1'
 # TODO(vikasrk): Fix this once runner sends appropriate common_urns.
 OLD_DATAFLOW_RUNNER_HARNESS_PARDO_URN = 'beam:dofn:javasdk:0.1'