You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/10/20 03:04:20 UTC
[beam] branch master updated: [BEAM-13079] Updates cross-language
transform URNs to use the new convention (#15748)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8578c9a [BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748)
8578c9a is described below
commit 8578c9a9352a8a5c9621647e9c585321f9761dc5
Author: Chamikara Jayalath <ch...@apache.org>
AuthorDate: Tue Oct 19 20:03:08 2021 -0700
[BEAM-13079] Updates cross-language transform URNs to use the new convention (#15748)
* Updates cross-language transform URNs to use the new convention
* Fix the Spanner Insert URN.
Co-authored-by: Brian Hulette <hu...@gmail.com>
* Fix the Kafka test
* Fixes the Pub/Sub test failure
Co-authored-by: Brian Hulette <hu...@gmail.com>
---
sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go | 4 ++--
.../expansion/ExternalSchemaIOTransformRegistrar.java | 6 ++++--
.../apache/beam/io/debezium/DebeziumTransformRegistrar.java | 2 +-
.../java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java | 2 +-
.../org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java | 2 +-
.../beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java | 12 ++++++------
.../apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java | 4 ++--
.../src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 6 +++---
.../org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java | 11 +++++++----
.../beam/sdk/io/kinesis/KinesisTransformRegistrar.java | 4 ++--
.../snowflake/crosslanguage/SnowflakeTransformRegistrar.java | 4 ++--
sdks/python/apache_beam/io/debezium.py | 2 +-
sdks/python/apache_beam/io/external/gcp/pubsub.py | 4 ++--
sdks/python/apache_beam/io/gcp/spanner.py | 12 ++++++------
sdks/python/apache_beam/io/jdbc.py | 4 ++--
sdks/python/apache_beam/io/kafka.py | 8 +++++---
sdks/python/apache_beam/io/kinesis.py | 4 ++--
sdks/python/apache_beam/io/snowflake.py | 4 ++--
18 files changed, 51 insertions(+), 44 deletions(-)
diff --git a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
index 8781cff..785508a 100644
--- a/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
+++ b/sdks/go/pkg/beam/io/xlang/kafkaio/kafka.go
@@ -80,8 +80,8 @@ const (
// be found in Java's KafkaIO documentation.
LogAppendTime policy = "LogAppendTime"
- readURN = "beam:external:java:kafkaio:typedwithoutmetadata:v1"
- writeURN = "beam:external:java:kafka:write:v1"
+ readURN = "beam:transform:org.apache.beam:kafka_read_without_metadata:v1"
+ writeURN = "beam:transform:org.apache.beam:kafka_write:v1"
)
// Read is a cross-language PTransform which reads from Kafka and returns a
diff --git a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
index 4f3ab30..94ce7cf 100644
--- a/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
+++ b/sdks/java/extensions/schemaio-expansion-service/src/main/java/org/apache/beam/sdk/extensions/schemaio/expansion/ExternalSchemaIOTransformRegistrar.java
@@ -54,10 +54,12 @@ public class ExternalSchemaIOTransformRegistrar implements ExternalTransformRegi
try {
for (SchemaIOProvider schemaIOProvider : ServiceLoader.load(SchemaIOProvider.class)) {
builder.put(
- "beam:external:java:schemaio:" + schemaIOProvider.identifier() + ":read:v1",
+ "beam:transform:org.apache.beam:schemaio_" + schemaIOProvider.identifier() + "_read:v1",
new ReaderBuilder(schemaIOProvider));
builder.put(
- "beam:external:java:schemaio:" + schemaIOProvider.identifier() + ":write:v1",
+ "beam:transform:org.apache.beam:schemaio_"
+ + schemaIOProvider.identifier()
+ + "_write:v1",
new WriterBuilder(schemaIOProvider));
}
} catch (Exception e) {
diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index a00f706..0eae9c9 100644
--- a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
})
public class DebeziumTransformRegistrar implements ExternalTransformRegistrar {
private static final Logger LOG = LoggerFactory.getLogger(DebeziumTransformRegistrar.class);
- public static final String READ_JSON_URN = "beam:external:java:debezium:read:v1";
+ public static final String READ_JSON_URN = "beam:transform:org.apache.beam:debezium_read:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
index 250b68c..8632604 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalRead.java
@@ -40,7 +40,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public final class ExternalRead implements ExternalTransformRegistrar {
public ExternalRead() {}
- public static final String URN = "beam:external:java:pubsub:read:v1";
+ public static final String URN = "beam:transform:org.apache.beam:pubsub_read:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
index 718405b..ac558fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/ExternalWrite.java
@@ -41,7 +41,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
public final class ExternalWrite implements ExternalTransformRegistrar {
public ExternalWrite() {}
- public static final String URN = "beam:external:java:pubsub:write:v1";
+ public static final String URN = "beam:transform:org.apache.beam:pubsub_write:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
index 42bb0a4..7de23ec 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerTransformRegistrar.java
@@ -51,13 +51,13 @@ import org.joda.time.Duration;
@Experimental(Kind.PORTABILITY)
@AutoService(ExternalTransformRegistrar.class)
public class SpannerTransformRegistrar implements ExternalTransformRegistrar {
- public static final String INSERT_URN = "beam:external:java:spanner:insert:v1";
- public static final String UPDATE_URN = "beam:external:java:spanner:update:v1";
- public static final String REPLACE_URN = "beam:external:java:spanner:replace:v1";
+ public static final String INSERT_URN = "beam:transform:org.apache.beam:spanner_insert:v1";
+ public static final String UPDATE_URN = "beam:transform:org.apache.beam:spanner_update:v1";
+ public static final String REPLACE_URN = "beam:transform:org.apache.beam:spanner_replace:v1";
public static final String INSERT_OR_UPDATE_URN =
- "beam:external:java:spanner:insert_or_update:v1";
- public static final String DELETE_URN = "beam:external:java:spanner:delete:v1";
- public static final String READ_URN = "beam:external:java:spanner:read:v1";
+ "beam:transform:org.apache.beam:spanner_insert_or_update:v1";
+ public static final String DELETE_URN = "beam:transform:org.apache.beam:spanner_delete:v1";
+ public static final String READ_URN = "beam:transform:org.apache.beam:spanner_read:v1";
@Override
@NonNull
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
index c4ce0ee..7f245d1 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
@@ -77,7 +77,7 @@ public class PubsubIOExternalTest {
.setUniqueName("test")
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn("beam:external:java:pubsub:read:v1")
+ .setUrn(ExternalRead.URN)
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
@@ -136,7 +136,7 @@ public class PubsubIOExternalTest {
.putInputs("input", inputPCollection)
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn("beam:external:java:pubsub:write:v1")
+ .setUrn(ExternalWrite.URN)
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b7aad11..1cb309a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -781,9 +781,9 @@ public class KafkaIO {
// Using the transform name in the URN so that the corresponding transform can be easily
// identified.
public static final String URN_WITH_METADATA =
- "beam:external:java:kafkaio:externalwithmetadata:v1";
+ "beam:transform:org.apache.beam:kafka_read_with_metadata:v1";
public static final String URN_WITHOUT_METADATA =
- "beam:external:java:kafkaio:typedwithoutmetadata:v1";
+ "beam:transform:org.apache.beam:kafka_read_without_metadata:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
@@ -2574,7 +2574,7 @@ public class KafkaIO {
@AutoService(ExternalTransformRegistrar.class)
public static class External implements ExternalTransformRegistrar {
- public static final String URN = "beam:external:java:kafka:write:v1";
+ public static final String URN = "beam:transform:org.apache.beam:kafka_write:v1";
@Override
public Map<String, Class<? extends ExternalTransformBuilder<?, ?, ?>>> knownBuilders() {
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index 75d2f19..c699885 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -33,7 +33,6 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.expansion.service.ExpansionService;
import org.apache.beam.sdk.io.kafka.KafkaIO.ByteArrayKafkaRecord;
-import org.apache.beam.sdk.io.kafka.KafkaIO.Read.External;
import org.apache.beam.sdk.io.kafka.KafkaIO.RowsWithMetadata;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
@@ -129,7 +128,9 @@ public class KafkaIOExternalTest {
.setUniqueName("test")
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn(External.URN_WITH_METADATA)
+ .setUrn(
+ org.apache.beam.sdk.io.kafka.KafkaIO.Read.External
+ .URN_WITH_METADATA)
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
@@ -232,7 +233,9 @@ public class KafkaIOExternalTest {
.setUniqueName("test")
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn(External.URN_WITHOUT_METADATA)
+ .setUrn(
+ org.apache.beam.sdk.io.kafka.KafkaIO.Read.External
+ .URN_WITHOUT_METADATA)
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
@@ -304,7 +307,7 @@ public class KafkaIOExternalTest {
.putInputs("input", inputPCollection)
.setSpec(
RunnerApi.FunctionSpec.newBuilder()
- .setUrn("beam:external:java:kafka:write:v1")
+ .setUrn(org.apache.beam.sdk.io.kafka.KafkaIO.Write.External.URN)
.setPayload(payload.toByteString())))
.setNamespace("test_namespace")
.build();
diff --git a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
index af48ca5..c0ea97e 100644
--- a/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
+++ b/sdks/java/io/kinesis/src/main/java/org/apache/beam/sdk/io/kinesis/KinesisTransformRegistrar.java
@@ -45,8 +45,8 @@ import org.joda.time.Instant;
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class KinesisTransformRegistrar implements ExternalTransformRegistrar {
- public static final String WRITE_URN = "beam:external:java:kinesis:write:v1";
- public static final String READ_DATA_URN = "beam:external:java:kinesis:read_data:v1";
+ public static final String WRITE_URN = "beam:transform:org.apache.beam:kinesis_write:v1";
+ public static final String READ_DATA_URN = "beam:transform:org.apache.beam:kinesis_read_data:v1";
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
diff --git a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
index e69219a..adc1920 100644
--- a/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
+++ b/sdks/java/io/snowflake/src/main/java/org/apache/beam/sdk/io/snowflake/crosslanguage/SnowflakeTransformRegistrar.java
@@ -32,8 +32,8 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Immutabl
@Experimental(Kind.PORTABILITY)
@AutoService(ExternalTransformRegistrar.class)
public final class SnowflakeTransformRegistrar implements ExternalTransformRegistrar {
- public static final String READ_URN = "beam:external:java:snowflake:read:v1";
- public static final String WRITE_URN = "beam:external:java:snowflake:write:v1";
+ public static final String READ_URN = "beam:transform:org.apache.beam:snowflake_read:v1";
+ public static final String WRITE_URN = "beam:transform:org.apache.beam:snowflake_write:v1";
@Override
public Map<String, ExternalTransformBuilder<?, ?, ?>> knownBuilderInstances() {
diff --git a/sdks/python/apache_beam/io/debezium.py b/sdks/python/apache_beam/io/debezium.py
index e598b97..ada2576 100644
--- a/sdks/python/apache_beam/io/debezium.py
+++ b/sdks/python/apache_beam/io/debezium.py
@@ -128,7 +128,7 @@ class ReadFromDebezium(PTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:debezium:read:v1'
+ URN = 'beam:transform:org.apache.beam:debezium_read:v1'
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/external/gcp/pubsub.py b/sdks/python/apache_beam/io/external/gcp/pubsub.py
index 8f13771..20f153d 100644
--- a/sdks/python/apache_beam/io/external/gcp/pubsub.py
+++ b/sdks/python/apache_beam/io/external/gcp/pubsub.py
@@ -43,7 +43,7 @@ class ReadFromPubSub(beam.PTransform):
preparation of the Java SDK. See BEAM-7870.
"""
- URN = 'beam:external:java:pubsub:read:v1'
+ URN = 'beam:transform:org.apache.beam:pubsub_read:v1'
def __init__(
self,
@@ -127,7 +127,7 @@ class WriteToPubSub(beam.PTransform):
preparation of the Java SDK. See BEAM-7870.
"""
- URN = 'beam:external:java:pubsub:write:v1'
+ URN = 'beam:transform:org.apache.beam:pubsub_write:v1'
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/gcp/spanner.py b/sdks/python/apache_beam/io/gcp/spanner.py
index a5d3d14..c16daa4 100644
--- a/sdks/python/apache_beam/io/gcp/spanner.py
+++ b/sdks/python/apache_beam/io/gcp/spanner.py
@@ -173,7 +173,7 @@ class ReadFromSpanner(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:spanner:read:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_read:v1'
def __init__(
self,
@@ -375,7 +375,7 @@ def _add_doc(
)
class SpannerDelete(ExternalTransform):
- URN = 'beam:external:java:spanner:delete:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_delete:v1'
@_add_doc(_INIT_DOC, operation='a delete')
def __init__(
@@ -427,7 +427,7 @@ class SpannerDelete(ExternalTransform):
)
class SpannerInsert(ExternalTransform):
- URN = 'beam:external:java:spanner:insert:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_insert:v1'
@_add_doc(_INIT_DOC, operation='an insert')
def __init__(
@@ -479,7 +479,7 @@ class SpannerInsert(ExternalTransform):
)
class SpannerReplace(ExternalTransform):
- URN = 'beam:external:java:spanner:replace:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_replace:v1'
@_add_doc(_INIT_DOC, operation='a replace')
def __init__(
@@ -531,7 +531,7 @@ class SpannerReplace(ExternalTransform):
)
class SpannerInsertOrUpdate(ExternalTransform):
- URN = 'beam:external:java:spanner:insert_or_update:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_insert_or_update:v1'
@_add_doc(_INIT_DOC, operation='an insert-or-update')
def __init__(
@@ -583,7 +583,7 @@ class SpannerInsertOrUpdate(ExternalTransform):
)
class SpannerUpdate(ExternalTransform):
- URN = 'beam:external:java:spanner:update:v1'
+ URN = 'beam:transform:org.apache.beam:spanner_update:v1'
@_add_doc(_INIT_DOC, operation='an update')
def __init__(
diff --git a/sdks/python/apache_beam/io/jdbc.py b/sdks/python/apache_beam/io/jdbc.py
index afd39e0..683f94c 100644
--- a/sdks/python/apache_beam/io/jdbc.py
+++ b/sdks/python/apache_beam/io/jdbc.py
@@ -153,7 +153,7 @@ class WriteToJdbc(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:schemaio:jdbc:write:v1'
+ URN = 'beam:transform:org.apache.beam:schemaio_jdbc_write:v1'
def __init__(
self,
@@ -236,7 +236,7 @@ class ReadFromJdbc(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:schemaio:jdbc:read:v1'
+ URN = 'beam:transform:org.apache.beam:schemaio_jdbc_read:v1'
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/kafka.py b/sdks/python/apache_beam/io/kafka.py
index 8d58dc3..c7c0756 100644
--- a/sdks/python/apache_beam/io/kafka.py
+++ b/sdks/python/apache_beam/io/kafka.py
@@ -117,8 +117,10 @@ class ReadFromKafka(ExternalTransform):
create_time_policy = 'CreateTime'
log_append_time = 'LogAppendTime'
- URN_WITH_METADATA = 'beam:external:java:kafkaio:externalwithmetadata:v1'
- URN_WITHOUT_METADATA = 'beam:external:java:kafkaio:typedwithoutmetadata:v1'
+ URN_WITH_METADATA = (
+ 'beam:transform:org.apache.beam:kafka_read_with_metadata:v1')
+ URN_WITHOUT_METADATA = (
+ 'beam:transform:org.apache.beam:kafka_read_without_metadata:v1')
def __init__(
self,
@@ -210,7 +212,7 @@ class WriteToKafka(ExternalTransform):
byte_array_serializer = (
'org.apache.kafka.common.serialization.ByteArraySerializer')
- URN = 'beam:external:java:kafka:write:v1'
+ URN = 'beam:transform:org.apache.beam:kafka_write:v1'
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py
index aca0dc1..c87dc01 100644
--- a/sdks/python/apache_beam/io/kinesis.py
+++ b/sdks/python/apache_beam/io/kinesis.py
@@ -123,7 +123,7 @@ class WriteToKinesis(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:kinesis:write:v1'
+ URN = 'beam:transform:org.apache.beam:kinesis_write:v1'
def __init__(
self,
@@ -199,7 +199,7 @@ class ReadDataFromKinesis(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
- URN = 'beam:external:java:kinesis:read_data:v1'
+ URN = 'beam:transform:org.apache.beam:kinesis_read:v1'
def __init__(
self,
diff --git a/sdks/python/apache_beam/io/snowflake.py b/sdks/python/apache_beam/io/snowflake.py
index 9331aea..fa797f6 100644
--- a/sdks/python/apache_beam/io/snowflake.py
+++ b/sdks/python/apache_beam/io/snowflake.py
@@ -122,7 +122,7 @@ class ReadFromSnowflake(beam.PTransform):
An external PTransform which reads from Snowflake.
"""
- URN = 'beam:external:java:snowflake:read:v1'
+ URN = 'beam:transform:org.apache.beam:snowflake_read:v1'
def __init__(
self,
@@ -263,7 +263,7 @@ class WriteToSnowflake(beam.PTransform):
An external PTransform which writes to Snowflake.
"""
- URN = 'beam:external:java:snowflake:write:v1'
+ URN = 'beam:transform:org.apache.beam:snowflake_write:v1'
def __init__(
self,