You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/09/13 16:11:43 UTC

[beam] 01/01: Revert "[7746] Create a more user friendly external transform API"

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch revert-9098-py_external_api
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 971c3e343f87f357a6448c41d2445eebcf44a237
Author: Ahmet Altay <aa...@gmail.com>
AuthorDate: Fri Sep 13 09:10:47 2019 -0700

    Revert "[7746] Create a more user friendly external transform API"
---
 .../construction/expansion/ExpansionService.java   |  10 +-
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  68 +++++---
 .../beam/sdk/io/kafka/KafkaIOExternalTest.java     |  47 ++---
 .../apache_beam/io/external/generate_sequence.py   |  62 +++++--
 sdks/python/apache_beam/io/external/kafka.py       | 168 +++++++++++-------
 sdks/python/apache_beam/transforms/external.py     | 189 +--------------------
 .../python/apache_beam/transforms/external_test.py | 164 +-----------------
 .../apache_beam/transforms/external_test_py3.py    |  93 ----------
 .../apache_beam/transforms/external_test_py37.py   |  71 --------
 sdks/python/scripts/generate_pydoc.sh              |   2 +-
 sdks/python/scripts/run_mini_py3lint.sh            |  21 +--
 sdks/python/scripts/run_pylint.sh                  |   2 +-
 sdks/python/setup.cfg                              |   3 -
 sdks/python/tox.ini                                |  22 +--
 14 files changed, 240 insertions(+), 682 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
index ae1a3d7..795298a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
@@ -179,15 +179,11 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB
         } catch (NoSuchMethodException e) {
           throw new RuntimeException(
               String.format(
-                  "The configuration class %s is missing a setter %s for %s with type %s",
-                  config.getClass(),
-                  setterName,
-                  fieldName,
-                  coder.getEncodedTypeDescriptor().getType().getTypeName()),
+                  "The configuration class %s is missing a setter %s for %s",
+                  config.getClass(), setterName, fieldName),
               e);
         }
-        method.invoke(
-            config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED));
+        method.invoke(config, coder.decode(entry.getValue().getPayload().newInput()));
       }
     }
 
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index ca74ecb..373d4b8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -401,22 +402,26 @@ public class KafkaIO {
       public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
           External.Configuration config) {
         ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
-        for (String topic : config.topics) {
-          listBuilder.add(topic);
+        for (byte[] topic : config.topics) {
+          listBuilder.add(utf8String(topic));
         }
         setTopics(listBuilder.build());
 
-        Class keyDeserializer = resolveClass(config.keyDeserializer);
+        String keyDeserializerClassName = utf8String(config.keyDeserializer);
+        Class keyDeserializer = resolveClass(keyDeserializerClassName);
         setKeyDeserializer(keyDeserializer);
         setKeyCoder(resolveCoder(keyDeserializer));
 
-        Class valueDeserializer = resolveClass(config.valueDeserializer);
+        String valueDeserializerClassName = utf8String(config.valueDeserializer);
+        Class valueDeserializer = resolveClass(valueDeserializerClassName);
         setValueDeserializer(valueDeserializer);
         setValueCoder(resolveCoder(valueDeserializer));
 
         Map<String, Object> consumerConfig = new HashMap<>();
-        for (KV<String, String> kv : config.consumerConfig) {
-          consumerConfig.put(kv.getKey(), kv.getValue());
+        for (KV<byte[], byte[]> kv : config.consumerConfig) {
+          String key = utf8String(kv.getKey());
+          String value = utf8String(kv.getValue());
+          consumerConfig.put(key, value);
         }
         // Key and Value Deserializers always have to be in the config.
         consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
@@ -475,24 +480,24 @@ public class KafkaIO {
       public static class Configuration {
 
         // All byte arrays are UTF-8 encoded strings
-        private Iterable<KV<String, String>> consumerConfig;
-        private Iterable<String> topics;
-        private String keyDeserializer;
-        private String valueDeserializer;
+        private Iterable<KV<byte[], byte[]>> consumerConfig;
+        private Iterable<byte[]> topics;
+        private byte[] keyDeserializer;
+        private byte[] valueDeserializer;
 
-        public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
+        public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) {
           this.consumerConfig = consumerConfig;
         }
 
-        public void setTopics(Iterable<String> topics) {
+        public void setTopics(Iterable<byte[]> topics) {
           this.topics = topics;
         }
 
-        public void setKeyDeserializer(String keyDeserializer) {
+        public void setKeyDeserializer(byte[] keyDeserializer) {
           this.keyDeserializer = keyDeserializer;
         }
 
-        public void setValueDeserializer(String valueDeserializer) {
+        public void setValueDeserializer(byte[] valueDeserializer) {
           this.valueDeserializer = valueDeserializer;
         }
       }
@@ -1360,21 +1365,24 @@ public class KafkaIO {
       @Override
       public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
           External.Configuration configuration) {
-        setTopic(configuration.topic);
+        String topic = utf8String(configuration.topic);
+        setTopic(topic);
 
         Map<String, Object> producerConfig = new HashMap<>();
-        for (KV<String, String> kv : configuration.producerConfig) {
-          producerConfig.put(kv.getKey(), kv.getValue());
+        for (KV<byte[], byte[]> kv : configuration.producerConfig) {
+          String key = utf8String(kv.getKey());
+          String value = utf8String(kv.getValue());
+          producerConfig.put(key, value);
         }
-        Class keySerializer = resolveClass(configuration.keySerializer);
-        Class valSerializer = resolveClass(configuration.valueSerializer);
+        Class keySerializer = resolveClass(utf8String(configuration.keySerializer));
+        Class valSerializer = resolveClass(utf8String(configuration.valueSerializer));
 
         WriteRecords<K, V> writeRecords =
             KafkaIO.<K, V>writeRecords()
                 .withProducerConfigUpdates(producerConfig)
                 .withKeySerializer(keySerializer)
                 .withValueSerializer(valSerializer)
-                .withTopic(configuration.topic);
+                .withTopic(topic);
         setWriteRecordsTransform(writeRecords);
 
         return build();
@@ -1397,24 +1405,24 @@ public class KafkaIO {
       public static class Configuration {
 
         // All byte arrays are UTF-8 encoded strings
-        private Iterable<KV<String, String>> producerConfig;
-        private String topic;
-        private String keySerializer;
-        private String valueSerializer;
+        private Iterable<KV<byte[], byte[]>> producerConfig;
+        private byte[] topic;
+        private byte[] keySerializer;
+        private byte[] valueSerializer;
 
-        public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
+        public void setProducerConfig(Iterable<KV<byte[], byte[]>> producerConfig) {
           this.producerConfig = producerConfig;
         }
 
-        public void setTopic(String topic) {
+        public void setTopic(byte[] topic) {
           this.topic = topic;
         }
 
-        public void setKeySerializer(String keySerializer) {
+        public void setKeySerializer(byte[] keySerializer) {
           this.keySerializer = keySerializer;
         }
 
-        public void setValueSerializer(String valueSerializer) {
+        public void setValueSerializer(byte[] valueSerializer) {
           this.valueSerializer = valueSerializer;
         }
       }
@@ -1683,6 +1691,10 @@ public class KafkaIO {
         String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
   }
 
+  private static String utf8String(byte[] bytes) {
+    return new String(bytes, Charsets.UTF_8);
+  }
+
   private static Class resolveClass(String className) {
     try {
       return Class.forName(className);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index a7b7f8a..83f2cc7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -32,15 +32,17 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.construction.expansion.ExpansionService;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Impulse;
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
 import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -74,7 +76,7 @@ public class KafkaIOExternalTest {
                 "topics",
                 ExternalTransforms.ConfigValue.newBuilder()
                     .addCoderUrn("beam:coder:iterable:v1")
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(listAsBytes(topics)))
                     .build())
             .putConfiguration(
@@ -82,20 +84,20 @@ public class KafkaIOExternalTest {
                 ExternalTransforms.ConfigValue.newBuilder()
                     .addCoderUrn("beam:coder:iterable:v1")
                     .addCoderUrn("beam:coder:kv:v1")
-                    .addCoderUrn("beam:coder:string_utf8:v1")
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig)))
                     .build())
             .putConfiguration(
                 "key_deserializer",
                 ExternalTransforms.ConfigValue.newBuilder()
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(encodeString(keyDeserializer)))
                     .build())
             .putConfiguration(
                 "value_deserializer",
                 ExternalTransforms.ConfigValue.newBuilder()
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
                     .build())
             .build();
@@ -159,7 +161,7 @@ public class KafkaIOExternalTest {
             .putConfiguration(
                 "topic",
                 ExternalTransforms.ConfigValue.newBuilder()
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(encodeString(topic)))
                     .build())
             .putConfiguration(
@@ -167,20 +169,20 @@ public class KafkaIOExternalTest {
                 ExternalTransforms.ConfigValue.newBuilder()
                     .addCoderUrn("beam:coder:iterable:v1")
                     .addCoderUrn("beam:coder:kv:v1")
-                    .addCoderUrn("beam:coder:string_utf8:v1")
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(mapAsBytes(producerConfig)))
                     .build())
             .putConfiguration(
                 "key_serializer",
                 ExternalTransforms.ConfigValue.newBuilder()
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(encodeString(keySerializer)))
                     .build())
             .putConfiguration(
                 "value_serializer",
                 ExternalTransforms.ConfigValue.newBuilder()
-                    .addCoderUrn("beam:coder:string_utf8:v1")
+                    .addCoderUrn("beam:coder:bytes:v1")
                     .setPayload(ByteString.copyFrom(encodeString(valueSerializer)))
                     .build())
             .build();
@@ -246,30 +248,37 @@ public class KafkaIOExternalTest {
   }
 
   private static byte[] listAsBytes(List<String> stringList) throws IOException {
-    IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
+    IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of());
+    List<byte[]> bytesList =
+        stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    coder.encode(stringList, baos);
+    coder.encode(bytesList, baos);
     return baos.toByteArray();
   }
 
   private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException {
-    IterableCoder<KV<String, String>> coder =
-        IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
-    List<KV<String, String>> stringList =
+    IterableCoder<KV<byte[], byte[]>> coder =
+        IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
+    List<KV<byte[], byte[]>> bytesList =
         stringMap.entrySet().stream()
-            .map(kv -> KV.of(kv.getKey(), kv.getValue()))
+            .map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue())))
             .collect(Collectors.toList());
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    coder.encode(stringList, baos);
+    coder.encode(bytesList, baos);
     return baos.toByteArray();
   }
 
   private static byte[] encodeString(String str) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    StringUtf8Coder.of().encode(str, baos);
+    ByteArrayCoder.of().encode(utf8Bytes(str), baos);
     return baos.toByteArray();
   }
 
+  private static byte[] utf8Bytes(String str) {
+    Preconditions.checkNotNull(str, "String must not be null.");
+    return str.getBytes(Charsets.UTF_8);
+  }
+
   private static class TestStreamObserver<T> implements StreamObserver<T> {
 
     private T result;
diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py
index a17ec7b..0e0b1fd 100644
--- a/sdks/python/apache_beam/io/external/generate_sequence.py
+++ b/sdks/python/apache_beam/io/external/generate_sequence.py
@@ -17,11 +17,15 @@
 
 from __future__ import absolute_import
 
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
+from apache_beam import ExternalTransform
+from apache_beam import pvalue
+from apache_beam.coders import VarIntCoder
+from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
+from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
 
 
-class GenerateSequence(ExternalTransform):
+class GenerateSequence(ptransform.PTransform):
   """
     An external PTransform which provides a bounded or unbounded stream of
     integers.
@@ -45,19 +49,47 @@ class GenerateSequence(ExternalTransform):
 
     Experimental; no backwards compatibility guarantees.
   """
+
   URN = 'beam:external:java:generate_sequence:v1'
 
   def __init__(self, start, stop=None,
                elements_per_period=None, max_read_time=None,
-               expansion_service=None):
-    super(GenerateSequence, self).__init__(
-        self.URN,
-        ImplicitSchemaPayloadBuilder(
-            {
-                'start': start,
-                'stop': stop,
-                'elements_per_period': elements_per_period,
-                'max_read_time': max_read_time,
-            }
-        ),
-        expansion_service)
+               expansion_service='localhost:8097'):
+    super(GenerateSequence, self).__init__()
+    self.start = start
+    self.stop = stop
+    self.elements_per_period = elements_per_period
+    self.max_read_time = max_read_time
+    self.expansion_service = expansion_service
+
+  def expand(self, pbegin):
+    if not isinstance(pbegin, pvalue.PBegin):
+      raise Exception("GenerateSequence must be a root transform")
+
+    coder = VarIntCoder()
+    coder_urn = ['beam:coder:varint:v1']
+    args = {
+        'start':
+        ConfigValue(
+            coder_urn=coder_urn,
+            payload=coder.encode(self.start))
+    }
+    if self.stop:
+      args['stop'] = ConfigValue(
+          coder_urn=coder_urn,
+          payload=coder.encode(self.stop))
+    if self.elements_per_period:
+      args['elements_per_period'] = ConfigValue(
+          coder_urn=coder_urn,
+          payload=coder.encode(self.elements_per_period))
+    if self.max_read_time:
+      args['max_read_time'] = ConfigValue(
+          coder_urn=coder_urn,
+          payload=coder.encode(self.max_read_time))
+
+    payload = ExternalConfigurationPayload(configuration=args)
+    return pbegin.apply(
+        ExternalTransform(
+            self.URN,
+            payload.SerializeToString(),
+            self.expansion_service))
diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py
index f824515..ed24c00 100644
--- a/sdks/python/apache_beam/io/external/kafka.py
+++ b/sdks/python/apache_beam/io/external/kafka.py
@@ -37,25 +37,18 @@
 
 from __future__ import absolute_import
 
-import typing
-
-from past.builtins import unicode
-
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
-
-ReadFromKafkaSchema = typing.NamedTuple(
-    'ReadFromKafkaSchema',
-    [
-        ('consumer_config', typing.List[typing.Tuple[unicode, unicode]]),
-        ('topics', typing.List[unicode]),
-        ('key_deserializer', unicode),
-        ('value_deserializer', unicode),
-    ]
-)
-
-
-class ReadFromKafka(ExternalTransform):
+from apache_beam import ExternalTransform
+from apache_beam import pvalue
+from apache_beam.coders import BytesCoder
+from apache_beam.coders import IterableCoder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders.coders import LengthPrefixCoder
+from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
+from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
+
+
+class ReadFromKafka(ptransform.PTransform):
   """
     An external PTransform which reads from Kafka and returns a KV pair for
     each item in the specified Kafka topics. If no Kafka Deserializer for
@@ -71,13 +64,11 @@ class ReadFromKafka(ExternalTransform):
   byte_array_deserializer = 'org.apache.kafka.common.serialization.' \
                             'ByteArrayDeserializer'
 
-  URN = 'beam:external:java:kafka:read:v1'
-
   def __init__(self, consumer_config,
                topics,
                key_deserializer=byte_array_deserializer,
                value_deserializer=byte_array_deserializer,
-               expansion_service=None):
+               expansion_service='localhost:8097'):
     """
     Initializes a read operation from Kafka.
 
@@ -97,32 +88,38 @@ class ReadFromKafka(ExternalTransform):
                                serialization.ByteArrayDeserializer'.
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
-    super(ReadFromKafka, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            ReadFromKafkaSchema(
-                consumer_config=list(consumer_config.items()),
-                topics=topics,
-                key_deserializer=key_deserializer,
-                value_deserializer=value_deserializer,
-            )
-        ),
-        expansion_service
-    )
-
-
-WriteToKafkaSchema = typing.NamedTuple(
-    'WriteToKafkaSchema',
-    [
-        ('producer_config', typing.List[typing.Tuple[unicode, unicode]]),
-        ('topic', unicode),
-        ('key_serializer', unicode),
-        ('value_serializer', unicode),
-    ]
-)
-
-
-class WriteToKafka(ExternalTransform):
+    super(ReadFromKafka, self).__init__()
+    self._urn = 'beam:external:java:kafka:read:v1'
+    self.consumer_config = consumer_config
+    self.topics = topics
+    self.key_deserializer = key_deserializer
+    self.value_deserializer = value_deserializer
+    self.expansion_service = expansion_service
+
+  def expand(self, pbegin):
+    if not isinstance(pbegin, pvalue.PBegin):
+      raise Exception("ReadFromKafka must be a root transform")
+
+    args = {
+        'consumer_config':
+            _encode_map(self.consumer_config),
+        'topics':
+            _encode_list(self.topics),
+        'key_deserializer':
+            _encode_str(self.key_deserializer),
+        'value_deserializer':
+            _encode_str(self.value_deserializer),
+    }
+
+    payload = ExternalConfigurationPayload(configuration=args)
+    return pbegin.apply(
+        ExternalTransform(
+            self._urn,
+            payload.SerializeToString(),
+            self.expansion_service))
+
+
+class WriteToKafka(ptransform.PTransform):
   """
     An external PTransform which writes KV data to a specified Kafka topic.
     If no Kafka Serializer for key/value is provided, then key/value are
@@ -135,13 +132,11 @@ class WriteToKafka(ExternalTransform):
   byte_array_serializer = 'org.apache.kafka.common.serialization.' \
                           'ByteArraySerializer'
 
-  URN = 'beam:external:java:kafka:write:v1'
-
   def __init__(self, producer_config,
                topic,
                key_serializer=byte_array_serializer,
                value_serializer=byte_array_serializer,
-               expansion_service=None):
+               expansion_service='localhost:8097'):
     """
     Initializes a write operation to Kafka.
 
@@ -161,15 +156,62 @@ class WriteToKafka(ExternalTransform):
                                serialization.ByteArraySerializer'.
     :param expansion_service: The address (host:port) of the ExpansionService.
     """
-    super(WriteToKafka, self).__init__(
-        self.URN,
-        NamedTupleBasedPayloadBuilder(
-            WriteToKafkaSchema(
-                producer_config=list(producer_config.items()),
-                topic=topic,
-                key_serializer=key_serializer,
-                value_serializer=value_serializer,
-            )
-        ),
-        expansion_service
-    )
+    super(WriteToKafka, self).__init__()
+    self._urn = 'beam:external:java:kafka:write:v1'
+    self.producer_config = producer_config
+    self.topic = topic
+    self.key_serializer = key_serializer
+    self.value_serializer = value_serializer
+    self.expansion_service = expansion_service
+
+  def expand(self, pvalue):
+    args = {
+        'producer_config':
+            _encode_map(self.producer_config),
+        'topic':
+            _encode_str(self.topic),
+        'key_serializer':
+            _encode_str(self.key_serializer),
+        'value_serializer':
+            _encode_str(self.value_serializer),
+    }
+
+    payload = ExternalConfigurationPayload(configuration=args)
+    return pvalue.apply(
+        ExternalTransform(
+            self._urn,
+            payload.SerializeToString(),
+            self.expansion_service))
+
+
+def _encode_map(dict_obj):
+  kv_list = [(key.encode('utf-8'), val.encode('utf-8'))
+             for key, val in dict_obj.items()]
+  coder = IterableCoder(TupleCoder(
+      [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())]))
+  coder_urns = ['beam:coder:iterable:v1',
+                'beam:coder:kv:v1',
+                'beam:coder:bytes:v1',
+                'beam:coder:bytes:v1']
+  return ConfigValue(
+      coder_urn=coder_urns,
+      payload=coder.encode(kv_list))
+
+
+def _encode_list(list_obj):
+  encoded_list = [val.encode('utf-8') for val in list_obj]
+  coder = IterableCoder(LengthPrefixCoder(BytesCoder()))
+  coder_urns = ['beam:coder:iterable:v1',
+                'beam:coder:bytes:v1']
+  return ConfigValue(
+      coder_urn=coder_urns,
+      payload=coder.encode(encoded_list))
+
+
+def _encode_str(str_obj):
+  encoded_str = str_obj.encode('utf-8')
+  coder = LengthPrefixCoder(BytesCoder())
+  coder_urns = ['beam:coder:bytes:v1']
+  return ConfigValue(
+      coder_urn=coder_urns,
+      payload=coder.encode(encoded_str))
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index fd79fcf..a7e9fb5 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -27,18 +27,11 @@ import copy
 import threading
 
 from apache_beam import pvalue
-from apache_beam.coders import registry
 from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_expansion_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
 from apache_beam.runners import pipeline_context
 from apache_beam.transforms import ptransform
-from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
-from apache_beam.typehints.trivial_inference import instance_to_type
-from apache_beam.typehints.typehints import Union
-from apache_beam.typehints.typehints import UnionConstraint
 
 # Protect against environments where grpc is not available.
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -49,170 +42,6 @@ except ImportError:
   grpc = None
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
-DEFAULT_EXPANSION_SERVICE = 'localhost:8097'
-
-
-def _is_optional_or_none(typehint):
-  return (type(None) in typehint.union_types
-          if isinstance(typehint, UnionConstraint) else typehint is type(None))
-
-
-def _strip_optional(typehint):
-  if not _is_optional_or_none(typehint):
-    return typehint
-  new_types = typehint.union_types.difference({type(None)})
-  if len(new_types) == 1:
-    return list(new_types)[0]
-  return Union[new_types]
-
-
-def iter_urns(coder, context=None):
-  yield coder.to_runner_api_parameter(context)[0]
-  for child in coder._get_component_coders():
-    for urn in iter_urns(child, context):
-      yield urn
-
-
-class PayloadBuilder(object):
-  """
-  Abstract base class for building payloads to pass to ExternalTransform.
-  """
-
-  @classmethod
-  def _config_value(cls, obj, typehint):
-    """
-    Helper to create a ConfigValue with an encoded value.
-    """
-    coder = registry.get_coder(typehint)
-    urns = list(iter_urns(coder))
-    if 'beam:coder:pickled_python:v1' in urns:
-      raise RuntimeError("Found non-portable coder for %s" % (typehint,))
-    return ConfigValue(
-        coder_urn=urns,
-        payload=coder.get_impl().encode_nested(obj))
-
-  def build(self):
-    """
-    :return: ExternalConfigurationPayload
-    """
-    raise NotImplementedError
-
-  def payload(self):
-    """
-    The serialized ExternalConfigurationPayload
-
-    :return: bytes
-    """
-    return self.build().SerializeToString()
-
-
-class SchemaBasedPayloadBuilder(PayloadBuilder):
-  """
-  Base class for building payloads based on a schema that provides
-  type information for each configuration value to encode.
-
-  Note that if the schema defines a type as Optional, the corresponding value
-  will be omitted from the encoded payload, and thus the native transform
-  will determine the default.
-  """
-
-  def __init__(self, values, schema):
-    """
-    :param values: mapping of config names to values
-    :param schema: mapping of config names to types
-    """
-    self._values = values
-    self._schema = schema
-
-  @classmethod
-  def _encode_config(cls, values, schema):
-    result = {}
-    for key, value in values.items():
-
-      try:
-        typehint = schema[key]
-      except KeyError:
-        raise RuntimeError("No typehint provided for key %r" % key)
-
-      typehint = convert_to_beam_type(typehint)
-
-      if value is None:
-        if not _is_optional_or_none(typehint):
-          raise RuntimeError("If value is None, typehint should be "
-                             "optional. Got %r" % typehint)
-        # make it easy for user to filter None by default
-        continue
-      else:
-        # strip Optional from typehint so that pickled_python coder is not used
-        # for known types.
-        typehint = _strip_optional(typehint)
-      result[key] = cls._config_value(value, typehint)
-    return result
-
-  def build(self):
-    """
-    :return: ExternalConfigurationPayload
-    """
-    args = self._encode_config(self._values, self._schema)
-    return ExternalConfigurationPayload(configuration=args)
-
-
-class ImplicitSchemaPayloadBuilder(SchemaBasedPayloadBuilder):
-  """
-  Build a payload that generates a schema from the provided values.
-  """
-  def __init__(self, values):
-    schema = {key: instance_to_type(value) for key, value in values.items()}
-    super(ImplicitSchemaPayloadBuilder, self).__init__(values, schema)
-
-
-class NamedTupleBasedPayloadBuilder(SchemaBasedPayloadBuilder):
-  """
-  Build a payload based on a NamedTuple schema.
-  """
-  def __init__(self, tuple_instance):
-    """
-    :param tuple_instance: an instance of a typing.NamedTuple
-    """
-    super(NamedTupleBasedPayloadBuilder, self).__init__(
-        values=tuple_instance._asdict(), schema=tuple_instance._field_types)
-
-
-class AnnotationBasedPayloadBuilder(SchemaBasedPayloadBuilder):
-  """
-  Build a payload based on an external transform's type annotations.
-
-  Supported in python 3 only.
-  """
-  def __init__(self, transform, **values):
-    """
-    :param transform: a PTransform instance or class. type annotations will
-                      be gathered from its __init__ method
-    :param values: values to encode
-    """
-    schema = {k: v for k, v in
-              transform.__init__.__annotations__.items()
-              if k in values}
-    super(AnnotationBasedPayloadBuilder, self).__init__(values, schema)
-
-
-class DataclassBasedPayloadBuilder(SchemaBasedPayloadBuilder):
-  """
-  Build a payload based on an external transform that uses dataclasses.
-
-  Supported in python 3 only.
-  """
-  def __init__(self, transform):
-    """
-    :param transform: a dataclass-decorated PTransform instance from which to
-                      gather type annotations and values
-    """
-    import dataclasses
-    schema = {field.name: field.type for field in
-              dataclasses.fields(transform)}
-    super(DataclassBasedPayloadBuilder, self).__init__(
-        dataclasses.asdict(transform), schema)
-
 
 class ExternalTransform(ptransform.PTransform):
   """
@@ -227,29 +56,15 @@ class ExternalTransform(ptransform.PTransform):
   _EXPANDED_TRANSFORM_UNIQUE_NAME = 'root'
   _IMPULSE_PREFIX = 'impulse'
 
-  def __init__(self, urn, payload, endpoint=None):
-    endpoint = endpoint or DEFAULT_EXPANSION_SERVICE
+  def __init__(self, urn, payload, endpoint):
     if grpc is None and isinstance(endpoint, str):
       raise NotImplementedError('Grpc required for external transforms.')
     # TODO: Start an endpoint given an environment?
     self._urn = urn
-    self._payload = payload.payload() \
-      if isinstance(payload, PayloadBuilder) \
-      else payload
+    self._payload = payload
     self._endpoint = endpoint
     self._namespace = self._fresh_namespace()
 
-  def __post_init__(self, expansion_service):
-    """
-    This will only be invoked if ExternalTransform is used as a base class
-    for a class decorated with dataclasses.dataclass
-    """
-    ExternalTransform.__init__(
-        self,
-        self.URN,
-        DataclassBasedPayloadBuilder(self),
-        expansion_service)
-
   def default_label(self):
     return '%s(%s)' % (self.__class__.__name__, self._urn)
 
diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py
index 6576419..e480f62 100644
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ b/sdks/python/apache_beam/transforms/external_test.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-"""Unit tests for the transform.external classes."""
+"""Unit tests for the transform.util classes."""
 
 from __future__ import absolute_import
 
@@ -23,7 +23,6 @@ import argparse
 import os
 import subprocess
 import sys
-import typing
 import unittest
 
 import grpc
@@ -33,21 +32,12 @@ from past.builtins import unicode
 
 import apache_beam as beam
 from apache_beam import Pipeline
-from apache_beam.coders import FloatCoder
-from apache_beam.coders import IterableCoder
-from apache_beam.coders import StrUtf8Coder
-from apache_beam.coders import TupleCoder
-from apache_beam.coders import VarIntCoder
 from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
 from apache_beam.runners.portability import expansion_service
 from apache_beam.runners.portability.expansion_service_test import FibTransform
 from apache_beam.testing.test_pipeline import TestPipeline
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
-from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
 
 # Protect against environments where apitools library is not available.
 # pylint: disable=wrong-import-order, wrong-import-position
@@ -58,158 +48,6 @@ except ImportError:
 # pylint: enable=wrong-import-order, wrong-import-position
 
 
-def get_payload(args):
-  return ExternalConfigurationPayload(configuration=args)
-
-
-class PayloadBase(object):
-  values = {
-      'integer_example': 1,
-      'string_example': u'thing',
-      'list_of_strings': [u'foo', u'bar'],
-      'optional_kv': (u'key', 1.1),
-      'optional_integer': None,
-  }
-
-  bytes_values = {
-      'integer_example': 1,
-      'string_example': 'thing',
-      'list_of_strings': ['foo', 'bar'],
-      'optional_kv': ('key', 1.1),
-      'optional_integer': None,
-  }
-
-  args = {
-      'integer_example': ConfigValue(
-          coder_urn=['beam:coder:varint:v1'],
-          payload=VarIntCoder()
-          .get_impl().encode_nested(values['integer_example'])),
-      'string_example': ConfigValue(
-          coder_urn=['beam:coder:string_utf8:v1'],
-          payload=StrUtf8Coder()
-          .get_impl().encode_nested(values['string_example'])),
-      'list_of_strings': ConfigValue(
-          coder_urn=['beam:coder:iterable:v1',
-                     'beam:coder:string_utf8:v1'],
-          payload=IterableCoder(StrUtf8Coder())
-          .get_impl().encode_nested(values['list_of_strings'])),
-      'optional_kv': ConfigValue(
-          coder_urn=['beam:coder:kv:v1',
-                     'beam:coder:string_utf8:v1',
-                     'beam:coder:double:v1'],
-          payload=TupleCoder([StrUtf8Coder(), FloatCoder()])
-          .get_impl().encode_nested(values['optional_kv'])),
-  }
-
-  def get_payload_from_typing_hints(self, values):
-    """Return ExternalConfigurationPayload based on python typing hints"""
-    raise NotImplementedError
-
-  def get_payload_from_beam_typehints(self, values):
-    """Return ExternalConfigurationPayload based on beam typehints"""
-    raise NotImplementedError
-
-  def test_typing_payload_builder(self):
-    result = self.get_payload_from_typing_hints(self.values)
-    expected = get_payload(self.args)
-    self.assertEqual(result, expected)
-
-  def test_typing_payload_builder_with_bytes(self):
-    """
-    string_utf8 coder will be used even if values are not unicode in python 2.x
-    """
-    result = self.get_payload_from_typing_hints(self.bytes_values)
-    expected = get_payload(self.args)
-    self.assertEqual(result, expected)
-
-  def test_typehints_payload_builder(self):
-    result = self.get_payload_from_beam_typehints(self.values)
-    expected = get_payload(self.args)
-    self.assertEqual(result, expected)
-
-  def test_typehints_payload_builder_with_bytes(self):
-    """
-    string_utf8 coder will be used even if values are not unicode in python 2.x
-    """
-    result = self.get_payload_from_beam_typehints(self.bytes_values)
-    expected = get_payload(self.args)
-    self.assertEqual(result, expected)
-
-  def test_optional_error(self):
-    """
-    value can only be None if typehint is Optional
-    """
-    with self.assertRaises(RuntimeError):
-      self.get_payload_from_typing_hints({k: None for k in self.values})
-
-
-class ExternalTuplePayloadTest(PayloadBase, unittest.TestCase):
-
-  def get_payload_from_typing_hints(self, values):
-    TestSchema = typing.NamedTuple(
-        'TestSchema',
-        [
-            ('integer_example', int),
-            ('string_example', unicode),
-            ('list_of_strings', typing.List[unicode]),
-            ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]),
-            ('optional_integer', typing.Optional[int]),
-        ]
-    )
-
-    builder = NamedTupleBasedPayloadBuilder(TestSchema(**values))
-    return builder.build()
-
-  def get_payload_from_beam_typehints(self, values):
-    raise unittest.SkipTest("Beam typehints cannot be used with "
-                            "typing.NamedTuple")
-
-
-class ExternalImplicitPayloadTest(unittest.TestCase):
-  """
-  ImplicitSchemaPayloadBuilder works very differently than the other payload
-  builders
-  """
-  def test_implicit_payload_builder(self):
-    builder = ImplicitSchemaPayloadBuilder(PayloadBase.values)
-    result = builder.build()
-    expected = get_payload(PayloadBase.args)
-    self.assertEqual(result, expected)
-
-  def test_implicit_payload_builder_with_bytes(self):
-    values = PayloadBase.bytes_values
-    builder = ImplicitSchemaPayloadBuilder(values)
-    result = builder.build()
-    if sys.version_info[0] < 3:
-      # in python 2.x bytes coder will be inferred
-      args = {
-          'integer_example': ConfigValue(
-              coder_urn=['beam:coder:varint:v1'],
-              payload=VarIntCoder()
-              .get_impl().encode_nested(values['integer_example'])),
-          'string_example': ConfigValue(
-              coder_urn=['beam:coder:bytes:v1'],
-              payload=StrUtf8Coder()
-              .get_impl().encode_nested(values['string_example'])),
-          'list_of_strings': ConfigValue(
-              coder_urn=['beam:coder:iterable:v1',
-                         'beam:coder:bytes:v1'],
-              payload=IterableCoder(StrUtf8Coder())
-              .get_impl().encode_nested(values['list_of_strings'])),
-          'optional_kv': ConfigValue(
-              coder_urn=['beam:coder:kv:v1',
-                         'beam:coder:bytes:v1',
-                         'beam:coder:double:v1'],
-              payload=TupleCoder([StrUtf8Coder(), FloatCoder()])
-              .get_impl().encode_nested(values['optional_kv'])),
-      }
-      expected = get_payload(args)
-      self.assertEqual(result, expected)
-    else:
-      expected = get_payload(PayloadBase.args)
-      self.assertEqual(result, expected)
-
-
 @attr('UsesCrossLanguageTransforms')
 class ExternalTransformTest(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py
deleted file mode 100644
index 88fa870..0000000
--- a/sdks/python/apache_beam/transforms/external_test_py3.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.external classes."""
-
-from __future__ import absolute_import
-
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
-from apache_beam.transforms.external import AnnotationBasedPayloadBuilder
-from apache_beam.transforms.external_test import PayloadBase
-
-
-def get_payload(cls):
-  payload = ExternalConfigurationPayload()
-  payload.ParseFromString(cls._payload)
-  return payload
-
-
-class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase):
-
-  def get_payload_from_typing_hints(self, values):
-    class AnnotatedTransform(beam.ExternalTransform):
-      URN = 'beam:external:fakeurn:v1'
-
-      def __init__(self,
-                   integer_example: int,
-                   string_example: str,
-                   list_of_strings: typing.List[str],
-                   optional_kv: typing.Optional[typing.Tuple[str, float]] = None,
-                   optional_integer: typing.Optional[int] = None,
-                   expansion_service=None):
-        super(AnnotatedTransform, self).__init__(
-            self.URN,
-            AnnotationBasedPayloadBuilder(
-                self,
-                integer_example=integer_example,
-                string_example=string_example,
-                list_of_strings=list_of_strings,
-                optional_kv=optional_kv,
-                optional_integer=optional_integer,
-            ),
-            expansion_service
-        )
-
-    return get_payload(AnnotatedTransform(**values))
-
-  def get_payload_from_beam_typehints(self, values):
-    class AnnotatedTransform(beam.ExternalTransform):
-      URN = 'beam:external:fakeurn:v1'
-
-      def __init__(self,
-                   integer_example: int,
-                   string_example: str,
-                   list_of_strings: typehints.List[str],
-                   optional_kv: typehints.Optional[typehints.KV[str, float]] = None,
-                   optional_integer: typehints.Optional[int] = None,
-                   expansion_service=None):
-        super(AnnotatedTransform, self).__init__(
-            self.URN,
-            AnnotationBasedPayloadBuilder(
-                self,
-                integer_example=integer_example,
-                string_example=string_example,
-                list_of_strings=list_of_strings,
-                optional_kv=optional_kv,
-                optional_integer=optional_integer,
-            ),
-            expansion_service
-        )
-
-    return get_payload(AnnotatedTransform(**values))
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py
deleted file mode 100644
index ad1ff72..0000000
--- a/sdks/python/apache_beam/transforms/external_test_py37.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.external classes."""
-
-from __future__ import absolute_import
-
-import dataclasses
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
-from apache_beam.transforms.external_test import PayloadBase
-
-
-def get_payload(cls):
-  payload = ExternalConfigurationPayload()
-  payload.ParseFromString(cls._payload)
-  return payload
-
-
-class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase):
-
-  def get_payload_from_typing_hints(self, values):
-
-    @dataclasses.dataclass
-    class DataclassTransform(beam.ExternalTransform):
-      URN = 'beam:external:fakeurn:v1'
-
-      integer_example: int
-      string_example: str
-      list_of_strings: typing.List[str]
-      optional_kv: typing.Optional[typing.Tuple[str, float]] = None
-      optional_integer: typing.Optional[int] = None
-      expansion_service: dataclasses.InitVar[typing.Optional[str]] = None
-
-    return get_payload(DataclassTransform(**values))
-
-  def get_payload_from_beam_typehints(self, values):
-
-    @dataclasses.dataclass
-    class DataclassTransform(beam.ExternalTransform):
-      URN = 'beam:external:fakeurn:v1'
-
-      integer_example: int
-      string_example: str
-      list_of_strings: typehints.List[str]
-      optional_kv: typehints.Optional[typehints.KV[str, float]] = None
-      optional_integer: typehints.Optional[int] = None
-      expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None
-
-    return get_payload(DataclassTransform(**values))
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 4a29b7a..d74dc62 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -77,7 +77,7 @@ excluded_patterns=(
     *_test.py
     *_test_common.py
     # TODO(BEAM-7847): Remove this once doc generation can parse Py3 syntax.
-    *_py3*.py
+    *py3.py
 )
 
 python $(type -p sphinx-apidoc) -fMeT -o target/docs/source apache_beam \
diff --git a/sdks/python/scripts/run_mini_py3lint.sh b/sdks/python/scripts/run_mini_py3lint.sh
index 0bd7e0e..27ca3ce 100755
--- a/sdks/python/scripts/run_mini_py3lint.sh
+++ b/sdks/python/scripts/run_mini_py3lint.sh
@@ -38,24 +38,6 @@ set -o pipefail
 
 MODULE=apache_beam
 
-PYTHON_MINOR=$(python -c 'import sys; print(sys.version_info[1])')
-if [[ "${PYTHON_MINOR}" == 5 ]]; then
-  EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[6-9]\.py$')
-  echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}"
-else
-  EXCLUDED_PY3_FILES=""
-fi
-
-FILES_TO_IGNORE=""
-for file in ${EXCLUDED_PY3_FILES}; do
-  if test -z "$FILES_TO_IGNORE"
-    then FILES_TO_IGNORE="$(basename $file)"
-    else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)"
-  fi
-done
-
-echo -e "Skipping lint for files:\n${FILES_TO_IGNORE}"
-
 usage(){ echo "Usage: $0 [MODULE|--help]  # The default MODULE is $MODULE"; }
 
 if test $# -gt 0; then
@@ -66,5 +48,4 @@ if test $# -gt 0; then
 fi
 
 echo "Running flake8 for module $MODULE:"
-flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics \
-  --exclude="${FILES_TO_IGNORE}"
+flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics
diff --git a/sdks/python/scripts/run_pylint.sh b/sdks/python/scripts/run_pylint.sh
index 27de9a3..608cd96 100755
--- a/sdks/python/scripts/run_pylint.sh
+++ b/sdks/python/scripts/run_pylint.sh
@@ -62,7 +62,7 @@ apache_beam/portability/api/*pb2*.py
 
 PYTHON_MAJOR=$(python -c 'import sys; print(sys.version_info[0])')
 if [[ "${PYTHON_MAJOR}" == 2 ]]; then
-  EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[0-9]*\.py$')
+  EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3\.py$')
   echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}"
 else
   EXCLUDED_PY3_FILES=""
diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg
index 361fe14..69a5187 100644
--- a/sdks/python/setup.cfg
+++ b/sdks/python/setup.cfg
@@ -51,6 +51,3 @@ exclude_lines =
 
 [coverage:xml]
 output = target/site/cobertura/coverage.xml
-
-[isort]
-known_standard_library = dataclasses
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 6fe8ecf..f792bbf 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -54,28 +54,28 @@ commands_post =
 [testenv:py27]
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+  python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
 
 [testenv:py35]
 setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py36]
 setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py37]
 setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py27-cython]
 # cython tests are only expected to work in linux (2.x and 3.x)
@@ -85,7 +85,7 @@ commands =
 platform = linux2
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+  python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
 
 [testenv:py35-cython]
 # cython tests are only expected to work in linux (2.x and 3.x)
@@ -97,7 +97,7 @@ setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[5-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py36-cython]
 # cython tests are only expected to work in linux (2.x and 3.x)
@@ -109,7 +109,7 @@ setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py37-cython]
 # cython tests are only expected to work in linux (2.x and 3.x)
@@ -121,13 +121,13 @@ setenv =
   RUN_SKIPPED_PY3_TESTS=0
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py27-gcp]
 extras = test,gcp
 commands =
   python apache_beam/examples/complete/autocomplete_test.py
-  python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+  python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
   # Old and new Datastore client unit tests cannot be run in the same process
   # due to conflicting protobuf modules.
   # TODO(BEAM-4543): Remove these separate nosetests invocations once the
@@ -140,14 +140,14 @@ setenv =
   RUN_SKIPPED_PY3_TESTS=0
 extras = test,gcp
 commands =
-  python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py37-gcp]
 setenv =
   RUN_SKIPPED_PY3_TESTS=0
 extras = test,gcp
 commands =
-  python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+  python setup.py nosetests {posargs}
 
 [testenv:py27-lint]
 deps =