You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/10/20 03:04:20 UTC

[beam] branch master updated: [BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8578c9a  [BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748)
8578c9a is described below

commit 8578c9a9352a8a5c9621647e9c585321f9761dc5
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Oct 19 20:03:08 2021 -0700

    [BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748)
    
    * Updates cross-language transform URNs to use the new convention
    
    * Fix the Spanner Insert URN.
    
    Co-authored-by: Brian Hulette <hu...@gmail.com>
    
    * Fix the Kafka test
    
    * Fixes the Pub/Sub test failure
    
    Co-authored-by: Brian Hulette <hu...@gmail.com>
---
 sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go                   |  4 ++--
 .../expansion/ExternalSchemaIOTransformRegistrar.java        |  6 ++++--
 .../apache/beam/io/debezium/DebeziumTransformRegistrar.java  |  2 +-
 .../java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java |  2 +-
 .../org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java     |  2 +-
 .../beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java   | 12 ++++++------
 .../apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java  |  4 ++--
 .../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  |  6 +++---
 .../org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java    | 11 +++++++----
 .../beam/sdk/io/kinesis/KinesisTransformRegistrar.java       |  4 ++--
 .../snowflake/crosslanguage/SnowflakeTransformRegistrar.java |  4 ++--
 sdks/python/apache_beam/io/debezium.py                       |  2 +-
 sdks/python/apache_beam/io/external/gcp/pubsub.py            |  4 ++--
 sdks/python/apache_beam/io/gcp/spanner.py                    | 12 ++++++------
 sdks/python/apache_beam/io/jdbc.py                           |  4 ++--
 sdks/python/apache_beam/io/kafka.py                          |  8 +++++---
 sdks/python/apache_beam/io/kinesis.py                        |  4 ++--
 sdks/python/apache_beam/io/snowflake.py                      |  4 ++--
 18 files changed, 51 insertions(+), 44 deletions(-)

diff --git a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
index 8781cff..785508a 100644
--- a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
+++ b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
@@ -80,8 +80,8 @@ const (
 	// be found in Java's KafkaIO documentation.
 	LogAppendTime policy = "LogAppendTime"
 
-	readURN  = "beam:external:java:kafkaio:typedwithoutmetadata:v1"
-	writeURN = "beam:external:java:kafka:write:v1"
+	readURN  = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
+	writeURN = "beam:transform:org.apache.beam:kafka_write:v1"
 )
 
 // Read is a cross-language PTransform which reads from Kafka and returns a
diff --git a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
index 4f3ab30..94ce7cf 100644
--- a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
+++ b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
@@ -54,10 +54,12 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
     try {
       for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
         builder.put(
-            "beam:external:java:schemaio:" + schemaIOProvider.identifier() + ":read:v1",
+            "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
             new ReaderBuilder(schemaIOProvider));
         builder.put(
-            "beam:external:java:schemaio:" + schemaIOProvider.identifier() + ":write:v1",
+            "beam:transform:org.apache.beam:schemaio_"
+                + schemaIOProvider.identifier()
+                + "_write:v1",
             new WriterBuilder(schemaIOProvider));
       }
     } catch (Exception e) {
diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index a00f706..0eae9c9 100644
--- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
 })
 public class DebeziumTransformRegistrar implements ExternalTransformRegistrar {
   private static final Logger LOG = LoggerFactory.getLogger(DebeziumTransformRegistrar.class);
-  public static final String READ_JSON_URN = "beam:external:java:debezium:read:v1";
+  public static final String READ_JSON_URN = "beam:transform:org.apache.beam:debezium_read:v1";
 
   @Override
   public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
index 250b68c..8632604 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
@@ -40,7 +40,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public final class ExternalRead implements ExternalTransformRegistrar {
   public ExternalRead() {}
 
-  public static final String URN = "beam:external:java:pubsub:read:v1";
+  public static final String URN = "beam:transform:org.apache.beam:pubsub_read:v1";
 
   @Override
   public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 718405b..ac558fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -41,7 +41,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 public final class ExternalWrite implements ExternalTransformRegistrar {
   public ExternalWrite() {}
 
-  public static final String URN = "beam:external:java:pubsub:write:v1";
+  public static final String URN = "beam:transform:org.apache.beam:pubsub_write:v1";
 
   @Override
   public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 42bb0a4..7de23ec 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -51,13 +51,13 @@ import org.joda.time.Duration;
 @Experimental(Kind.PORTABILITY)
 @AutoService(ExternalTransformRegistrar.class)
 public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
-  public static final String INSERT_URN = "beam:external:java:spanner:insert:v1";
-  public static final String UPDATE_URN = "beam:external:java:spanner:update:v1";
-  public static final String REPLACE_URN = "beam:external:java:spanner:replace:v1";
+  public static final String INSERT_URN = "beam:transform:org.apache.beam:spanner_insert:v1";
+  public static final String UPDATE_URN = "beam:transform:org.apache.beam:spanner_update:v1";
+  public static final String REPLACE_URN = "beam:transform:org.apache.beam:spanner_replace:v1";
   public static final String INSERT_OR_UPDATE_URN =
-      "beam:external:java:spanner:insert_or_update:v1";
-  public static final String DELETE_URN = "beam:external:java:spanner:delete:v1";
-  public static final String READ_URN = "beam:external:java:spanner:read:v1";
+      "beam:transform:org.apache.beam:spanner_insert_or_update:v1";
+  public static final String DELETE_URN = "beam:transform:org.apache.beam:spanner_delete:v1";
+  public static final String READ_URN = "beam:transform:org.apache.beam:spanner_read:v1";
 
   @Override
   @NonNull
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index c4ce0ee..7f245d1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -77,7 +77,7 @@ public class PubsubIOExternalTest {
                     .setUniqueName("test")
                     .setSpec(
                         RunnerApi.FunctionSpec.newBuilder()
-                            .setUrn("beam:external:java:pubsub:read:v1")
+                            .setUrn(ExternalRead.URN)
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
@@ -136,7 +136,7 @@ public class PubsubIOExternalTest {
                     .putInputs("input", inputPCollection)
                     .setSpec(
                         RunnerApi.FunctionSpec.newBuilder()
-                            .setUrn("beam:external:java:pubsub:write:v1")
+                            .setUrn(ExternalWrite.URN)
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b7aad11..1cb309a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -781,9 +781,9 @@ public class KafkaIO {
       // Using the transform name in the URN so that the corresponding transform can be easily
       // identified.
       public static final String URN_WITH_METADATA =
-          "beam:external:java:kafkaio:externalwithmetadata:v1";
+          "beam:transform:org.apache.beam:kafka_read_with_metadata:v1";
       public static final String URN_WITHOUT_METADATA =
-          "beam:external:java:kafkaio:typedwithoutmetadata:v1";
+          "beam:transform:org.apache.beam:kafka_read_without_metadata:v1";
 
       @Override
       public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
@@ -2574,7 +2574,7 @@ public class KafkaIO {
     @AutoService(ExternalTransformRegistrar.class)
     public static class External implements ExternalTransformRegistrar {
 
-      public static final String URN = "beam:external:java:kafka:write:v1";
+      public static final String URN = "beam:transform:org.apache.beam:kafka_write:v1";
 
       @Override
       public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 75d2f19..c699885 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.expansion.service.ExpansionService;
 import org.apache.beam.sdk.io.kafka.KafkaIO.ByteArrayKafkaRecord;
-import org.apache.beam.sdk.io.kafka.KafkaIO.Read.External;
 import org.apache.beam.sdk.io.kafka.KafkaIO.RowsWithMetadata;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -129,7 +128,9 @@ public class KafkaIOExternalTest {
                     .setUniqueName("test")
                     .setSpec(
                         RunnerApi.FunctionSpec.newBuilder()
-                            .setUrn(External.URN_WITH_METADATA)
+                            .setUrn(
+                                org.apache.beam.sdk.io.kafka.KafkaIO.Read.External
+                                    .URN_WITH_METADATA)
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
@@ -232,7 +233,9 @@ public class KafkaIOExternalTest {
                     .setUniqueName("test")
                     .setSpec(
                         RunnerApi.FunctionSpec.newBuilder()
-                            .setUrn(External.URN_WITHOUT_METADATA)
+                            .setUrn(
+                                org.apache.beam.sdk.io.kafka.KafkaIO.Read.External
+                                    .URN_WITHOUT_METADATA)
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
@@ -304,7 +307,7 @@ public class KafkaIOExternalTest {
                     .putInputs("input", inputPCollection)
                     .setSpec(
                         RunnerApi.FunctionSpec.newBuilder()
-                            .setUrn("beam:external:java:kafka:write:v1")
+                            .setUrn(org.apache.beam.sdk.io.kafka.KafkaIO.Write.External.URN)
                             .setPayload(payload.toByteString())))
             .setNamespace("test_namespace")
             .build();
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
index af48ca5..c0ea97e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
@@ -45,8 +45,8 @@ import org.joda.time.Instant;
   "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
 })
 public class KinesisTransformRegistrar implements ExternalTransformRegistrar {
-  public static final String WRITE_URN = "beam:external:java:kinesis:write:v1";
-  public static final String READ_DATA_URN = "beam:external:java:kinesis:read_data:v1";
+  public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v1";
+  public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v1";
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
index e69219a..adc1920 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -32,8 +32,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
 @Experimental(Kind.PORTABILITY)
 @AutoService(ExternalTransformRegistrar.class)
 public final class SnowflakeTransformRegistrar implements ExternalTransformRegistrar {
-  public static final String READ_URN = "beam:external:java:snowflake:read:v1";
-  public static final String WRITE_URN = "beam:external:java:snowflake:write:v1";
+  public static final String READ_URN = "beam:transform:org.apache.beam:snowflake_read:v1";
+  public static final String WRITE_URN = "beam:transform:org.apache.beam:snowflake_write:v1";
 
   @Override
   public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
diff --git a/sdks/python/apache_beam/io/debezium.py b/sdks/python/apache_beam/io/debezium.py
index e598b97..ada2576 100644
--- a/sdks/python/apache_beam/io/debezium.py
+++ b/sdks/python/apache_beam/io/debezium.py
@@ -128,7 +128,7 @@ class ReadFromDebezium(PTransform):
 
         Experimental; no backwards compatibility guarantees.
     """
-  URN = 'beam:external:java:debezium:read:v1'
+  URN = 'beam:transform:org.apache.beam:debezium_read:v1'
 
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py
index 8f13771..20f153d 100644
--- a/sdks/python/apache_beam/io/external/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py
@@ -43,7 +43,7 @@ class ReadFromPubSub(beam.PTransform):
   preparation of the Java SDK.  See BEAM-7870.
   """
 
-  URN = 'beam:external:java:pubsub:read:v1'
+  URN = 'beam:transform:org.apache.beam:pubsub_read:v1'
 
   def __init__(
       self,
@@ -127,7 +127,7 @@ class WriteToPubSub(beam.PTransform):
   preparation of the Java SDK.  See BEAM-7870.
   """
 
-  URN = 'beam:external:java:pubsub:write:v1'
+  URN = 'beam:transform:org.apache.beam:pubsub_write:v1'
 
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py b/sdks/python/apache_beam/io/gcp/spanner.py
index a5d3d14..c16daa4 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -173,7 +173,7 @@ class ReadFromSpanner(ExternalTransform):
   Experimental; no backwards compatibility guarantees.
   """
 
-  URN = 'beam:external:java:spanner:read:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_read:v1'
 
   def __init__(
       self,
@@ -375,7 +375,7 @@ def _add_doc(
 )
 class SpannerDelete(ExternalTransform):
 
-  URN = 'beam:external:java:spanner:delete:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_delete:v1'
 
   @_add_doc(_INIT_DOC, operation='a delete')
   def __init__(
@@ -427,7 +427,7 @@ class SpannerDelete(ExternalTransform):
 )
 class SpannerInsert(ExternalTransform):
 
-  URN = 'beam:external:java:spanner:insert:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_insert:v1'
 
   @_add_doc(_INIT_DOC, operation='an insert')
   def __init__(
@@ -479,7 +479,7 @@ class SpannerInsert(ExternalTransform):
 )
 class SpannerReplace(ExternalTransform):
 
-  URN = 'beam:external:java:spanner:replace:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_replace:v1'
 
   @_add_doc(_INIT_DOC, operation='a replace')
   def __init__(
@@ -531,7 +531,7 @@ class SpannerReplace(ExternalTransform):
 )
 class SpannerInsertOrUpdate(ExternalTransform):
 
-  URN = 'beam:external:java:spanner:insert_or_update:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_insert_or_update:v1'
 
   @_add_doc(_INIT_DOC, operation='an insert-or-update')
   def __init__(
@@ -583,7 +583,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
 )
 class SpannerUpdate(ExternalTransform):
 
-  URN = 'beam:external:java:spanner:update:v1'
+  URN = 'beam:transform:org.apache.beam:spanner_update:v1'
 
   @_add_doc(_INIT_DOC, operation='an update')
   def __init__(
diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py
index afd39e0..683f94c 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -153,7 +153,7 @@ class WriteToJdbc(ExternalTransform):
   Experimental; no backwards compatibility guarantees.
   """
 
-  URN = 'beam:external:java:schemaio:jdbc:write:v1'
+  URN = 'beam:transform:org.apache.beam:schemaio_jdbc_write:v1'
 
   def __init__(
       self,
@@ -236,7 +236,7 @@ class ReadFromJdbc(ExternalTransform):
   Experimental; no backwards compatibility guarantees.
   """
 
-  URN = 'beam:external:java:schemaio:jdbc:read:v1'
+  URN = 'beam:transform:org.apache.beam:schemaio_jdbc_read:v1'
 
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index 8d58dc3..c7c0756 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -117,8 +117,10 @@ class ReadFromKafka(ExternalTransform):
   create_time_policy = 'CreateTime'
   log_append_time = 'LogAppendTime'
 
-  URN_WITH_METADATA = 'beam:external:java:kafkaio:externalwithmetadata:v1'
-  URN_WITHOUT_METADATA = 'beam:external:java:kafkaio:typedwithoutmetadata:v1'
+  URN_WITH_METADATA = (
+      'beam:transform:org.apache.beam:kafka_read_with_metadata:v1')
+  URN_WITHOUT_METADATA = (
+      'beam:transform:org.apache.beam:kafka_read_without_metadata:v1')
 
   def __init__(
       self,
@@ -210,7 +212,7 @@ class WriteToKafka(ExternalTransform):
   byte_array_serializer = (
       'org.apache.kafka.common.serialization.ByteArraySerializer')
 
-  URN = 'beam:external:java:kafka:write:v1'
+  URN = 'beam:transform:org.apache.beam:kafka_write:v1'
 
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py
index aca0dc1..c87dc01 100644
--- a/sdks/python/apache_beam/io/kinesis.py
+++ b/sdks/python/apache_beam/io/kinesis.py
@@ -123,7 +123,7 @@ class WriteToKinesis(ExternalTransform):
 
     Experimental; no backwards compatibility guarantees.
   """
-  URN = 'beam:external:java:kinesis:write:v1'
+  URN = 'beam:transform:org.apache.beam:kinesis_write:v1'
 
   def __init__(
       self,
@@ -199,7 +199,7 @@ class ReadDataFromKinesis(ExternalTransform):
 
     Experimental; no backwards compatibility guarantees.
   """
-  URN = 'beam:external:java:kinesis:read_data:v1'
+  URN = 'beam:transform:org.apache.beam:kinesis_read:v1'
 
   def __init__(
       self,
diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py
index 9331aea..fa797f6 100644
--- a/sdks/python/apache_beam/io/snowflake.py
+++ b/sdks/python/apache_beam/io/snowflake.py
@@ -122,7 +122,7 @@ class ReadFromSnowflake(beam.PTransform):
     An external PTransform which reads from Snowflake.
   """
 
-  URN = 'beam:external:java:snowflake:read:v1'
+  URN = 'beam:transform:org.apache.beam:snowflake_read:v1'
 
   def __init__(
       self,
@@ -263,7 +263,7 @@ class WriteToSnowflake(beam.PTransform):
     An external PTransform which writes to Snowflake.
   """
 
-  URN = 'beam:external:java:snowflake:write:v1'
+  URN = 'beam:transform:org.apache.beam:snowflake_write:v1'
 
   def __init__(
       self,