You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2019/09/13 16:11:43 UTC
[beam] 01/01: Revert "[7746] Create a more user friendly external
transform API"
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch revert-9098-py_external_api
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 971c3e343f87f357a6448c41d2445eebcf44a237
Author: Ahmet Altay <aa...@gmail.com>
AuthorDate: Fri Sep 13 09:10:47 2019 -0700
Revert "[7746] Create a more user friendly external transform API"
---
.../construction/expansion/ExpansionService.java | 10 +-
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 68 +++++---
.../beam/sdk/io/kafka/KafkaIOExternalTest.java | 47 ++---
.../apache_beam/io/external/generate_sequence.py | 62 +++++--
sdks/python/apache_beam/io/external/kafka.py | 168 +++++++++++-------
sdks/python/apache_beam/transforms/external.py | 189 +--------------------
.../python/apache_beam/transforms/external_test.py | 164 +-----------------
.../apache_beam/transforms/external_test_py3.py | 93 ----------
.../apache_beam/transforms/external_test_py37.py | 71 --------
sdks/python/scripts/generate_pydoc.sh | 2 +-
sdks/python/scripts/run_mini_py3lint.sh | 21 +--
sdks/python/scripts/run_pylint.sh | 2 +-
sdks/python/setup.cfg | 3 -
sdks/python/tox.ini | 22 +--
14 files changed, 240 insertions(+), 682 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
index ae1a3d7..795298a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java
@@ -179,15 +179,11 @@ public class ExpansionService extends ExpansionServiceGrpc.ExpansionServiceImplB
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format(
- "The configuration class %s is missing a setter %s for %s with type %s",
- config.getClass(),
- setterName,
- fieldName,
- coder.getEncodedTypeDescriptor().getType().getTypeName()),
+ "The configuration class %s is missing a setter %s for %s",
+ config.getClass(), setterName, fieldName),
e);
}
- method.invoke(
- config, coder.decode(entry.getValue().getPayload().newInput(), Coder.Context.NESTED));
+ method.invoke(config, coder.decode(entry.getValue().getPayload().newInput()));
}
}
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index ca74ecb..373d4b8 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -65,6 +65,7 @@ import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -401,22 +402,26 @@ public class KafkaIO {
public PTransform<PBegin, PCollection<KV<K, V>>> buildExternal(
External.Configuration config) {
ImmutableList.Builder<String> listBuilder = ImmutableList.builder();
- for (String topic : config.topics) {
- listBuilder.add(topic);
+ for (byte[] topic : config.topics) {
+ listBuilder.add(utf8String(topic));
}
setTopics(listBuilder.build());
- Class keyDeserializer = resolveClass(config.keyDeserializer);
+ String keyDeserializerClassName = utf8String(config.keyDeserializer);
+ Class keyDeserializer = resolveClass(keyDeserializerClassName);
setKeyDeserializer(keyDeserializer);
setKeyCoder(resolveCoder(keyDeserializer));
- Class valueDeserializer = resolveClass(config.valueDeserializer);
+ String valueDeserializerClassName = utf8String(config.valueDeserializer);
+ Class valueDeserializer = resolveClass(valueDeserializerClassName);
setValueDeserializer(valueDeserializer);
setValueCoder(resolveCoder(valueDeserializer));
Map<String, Object> consumerConfig = new HashMap<>();
- for (KV<String, String> kv : config.consumerConfig) {
- consumerConfig.put(kv.getKey(), kv.getValue());
+ for (KV<byte[], byte[]> kv : config.consumerConfig) {
+ String key = utf8String(kv.getKey());
+ String value = utf8String(kv.getValue());
+ consumerConfig.put(key, value);
}
// Key and Value Deserializers always have to be in the config.
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
@@ -475,24 +480,24 @@ public class KafkaIO {
public static class Configuration {
// All byte arrays are UTF-8 encoded strings
- private Iterable<KV<String, String>> consumerConfig;
- private Iterable<String> topics;
- private String keyDeserializer;
- private String valueDeserializer;
+ private Iterable<KV<byte[], byte[]>> consumerConfig;
+ private Iterable<byte[]> topics;
+ private byte[] keyDeserializer;
+ private byte[] valueDeserializer;
- public void setConsumerConfig(Iterable<KV<String, String>> consumerConfig) {
+ public void setConsumerConfig(Iterable<KV<byte[], byte[]>> consumerConfig) {
this.consumerConfig = consumerConfig;
}
- public void setTopics(Iterable<String> topics) {
+ public void setTopics(Iterable<byte[]> topics) {
this.topics = topics;
}
- public void setKeyDeserializer(String keyDeserializer) {
+ public void setKeyDeserializer(byte[] keyDeserializer) {
this.keyDeserializer = keyDeserializer;
}
- public void setValueDeserializer(String valueDeserializer) {
+ public void setValueDeserializer(byte[] valueDeserializer) {
this.valueDeserializer = valueDeserializer;
}
}
@@ -1360,21 +1365,24 @@ public class KafkaIO {
@Override
public PTransform<PCollection<KV<K, V>>, PDone> buildExternal(
External.Configuration configuration) {
- setTopic(configuration.topic);
+ String topic = utf8String(configuration.topic);
+ setTopic(topic);
Map<String, Object> producerConfig = new HashMap<>();
- for (KV<String, String> kv : configuration.producerConfig) {
- producerConfig.put(kv.getKey(), kv.getValue());
+ for (KV<byte[], byte[]> kv : configuration.producerConfig) {
+ String key = utf8String(kv.getKey());
+ String value = utf8String(kv.getValue());
+ producerConfig.put(key, value);
}
- Class keySerializer = resolveClass(configuration.keySerializer);
- Class valSerializer = resolveClass(configuration.valueSerializer);
+ Class keySerializer = resolveClass(utf8String(configuration.keySerializer));
+ Class valSerializer = resolveClass(utf8String(configuration.valueSerializer));
WriteRecords<K, V> writeRecords =
KafkaIO.<K, V>writeRecords()
.withProducerConfigUpdates(producerConfig)
.withKeySerializer(keySerializer)
.withValueSerializer(valSerializer)
- .withTopic(configuration.topic);
+ .withTopic(topic);
setWriteRecordsTransform(writeRecords);
return build();
@@ -1397,24 +1405,24 @@ public class KafkaIO {
public static class Configuration {
// All byte arrays are UTF-8 encoded strings
- private Iterable<KV<String, String>> producerConfig;
- private String topic;
- private String keySerializer;
- private String valueSerializer;
+ private Iterable<KV<byte[], byte[]>> producerConfig;
+ private byte[] topic;
+ private byte[] keySerializer;
+ private byte[] valueSerializer;
- public void setProducerConfig(Iterable<KV<String, String>> producerConfig) {
+ public void setProducerConfig(Iterable<KV<byte[], byte[]>> producerConfig) {
this.producerConfig = producerConfig;
}
- public void setTopic(String topic) {
+ public void setTopic(byte[] topic) {
this.topic = topic;
}
- public void setKeySerializer(String keySerializer) {
+ public void setKeySerializer(byte[] keySerializer) {
this.keySerializer = keySerializer;
}
- public void setValueSerializer(String valueSerializer) {
+ public void setValueSerializer(byte[] valueSerializer) {
this.valueSerializer = valueSerializer;
}
}
@@ -1683,6 +1691,10 @@ public class KafkaIO {
String.format("Could not extract the Kafka Deserializer type from %s", deserializer));
}
+ private static String utf8String(byte[] bytes) {
+ return new String(bytes, Charsets.UTF_8);
+ }
+
private static Class resolveClass(String className) {
try {
return Class.forName(className);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
index a7b7f8a..83f2cc7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOExternalTest.java
@@ -32,15 +32,17 @@ import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.expansion.ExpansionService;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Charsets;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
@@ -74,7 +76,7 @@ public class KafkaIOExternalTest {
"topics",
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(listAsBytes(topics)))
.build())
.putConfiguration(
@@ -82,20 +84,20 @@ public class KafkaIOExternalTest {
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
- .addCoderUrn("beam:coder:string_utf8:v1")
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(consumerConfig)))
.build())
.putConfiguration(
"key_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(keyDeserializer)))
.build())
.putConfiguration(
"value_deserializer",
ExternalTransforms.ConfigValue.newBuilder()
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(valueDeserializer)))
.build())
.build();
@@ -159,7 +161,7 @@ public class KafkaIOExternalTest {
.putConfiguration(
"topic",
ExternalTransforms.ConfigValue.newBuilder()
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(topic)))
.build())
.putConfiguration(
@@ -167,20 +169,20 @@ public class KafkaIOExternalTest {
ExternalTransforms.ConfigValue.newBuilder()
.addCoderUrn("beam:coder:iterable:v1")
.addCoderUrn("beam:coder:kv:v1")
- .addCoderUrn("beam:coder:string_utf8:v1")
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(mapAsBytes(producerConfig)))
.build())
.putConfiguration(
"key_serializer",
ExternalTransforms.ConfigValue.newBuilder()
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(keySerializer)))
.build())
.putConfiguration(
"value_serializer",
ExternalTransforms.ConfigValue.newBuilder()
- .addCoderUrn("beam:coder:string_utf8:v1")
+ .addCoderUrn("beam:coder:bytes:v1")
.setPayload(ByteString.copyFrom(encodeString(valueSerializer)))
.build())
.build();
@@ -246,30 +248,37 @@ public class KafkaIOExternalTest {
}
private static byte[] listAsBytes(List<String> stringList) throws IOException {
- IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
+ IterableCoder<byte[]> coder = IterableCoder.of(ByteArrayCoder.of());
+ List<byte[]> bytesList =
+ stringList.stream().map(KafkaIOExternalTest::utf8Bytes).collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- coder.encode(stringList, baos);
+ coder.encode(bytesList, baos);
return baos.toByteArray();
}
private static byte[] mapAsBytes(Map<String, String> stringMap) throws IOException {
- IterableCoder<KV<String, String>> coder =
- IterableCoder.of(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
- List<KV<String, String>> stringList =
+ IterableCoder<KV<byte[], byte[]>> coder =
+ IterableCoder.of(KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()));
+ List<KV<byte[], byte[]>> bytesList =
stringMap.entrySet().stream()
- .map(kv -> KV.of(kv.getKey(), kv.getValue()))
+ .map(kv -> KV.of(utf8Bytes(kv.getKey()), utf8Bytes(kv.getValue())))
.collect(Collectors.toList());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- coder.encode(stringList, baos);
+ coder.encode(bytesList, baos);
return baos.toByteArray();
}
private static byte[] encodeString(String str) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- StringUtf8Coder.of().encode(str, baos);
+ ByteArrayCoder.of().encode(utf8Bytes(str), baos);
return baos.toByteArray();
}
+ private static byte[] utf8Bytes(String str) {
+ Preconditions.checkNotNull(str, "String must not be null.");
+ return str.getBytes(Charsets.UTF_8);
+ }
+
private static class TestStreamObserver<T> implements StreamObserver<T> {
private T result;
diff --git a/sdks/python/apache_beam/io/external/generate_sequence.py b/sdks/python/apache_beam/io/external/generate_sequence.py
index a17ec7b..0e0b1fd 100644
--- a/sdks/python/apache_beam/io/external/generate_sequence.py
+++ b/sdks/python/apache_beam/io/external/generate_sequence.py
@@ -17,11 +17,15 @@
from __future__ import absolute_import
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
+from apache_beam import ExternalTransform
+from apache_beam import pvalue
+from apache_beam.coders import VarIntCoder
+from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
+from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
-class GenerateSequence(ExternalTransform):
+class GenerateSequence(ptransform.PTransform):
"""
An external PTransform which provides a bounded or unbounded stream of
integers.
@@ -45,19 +49,47 @@ class GenerateSequence(ExternalTransform):
Experimental; no backwards compatibility guarantees.
"""
+
URN = 'beam:external:java:generate_sequence:v1'
def __init__(self, start, stop=None,
elements_per_period=None, max_read_time=None,
- expansion_service=None):
- super(GenerateSequence, self).__init__(
- self.URN,
- ImplicitSchemaPayloadBuilder(
- {
- 'start': start,
- 'stop': stop,
- 'elements_per_period': elements_per_period,
- 'max_read_time': max_read_time,
- }
- ),
- expansion_service)
+ expansion_service='localhost:8097'):
+ super(GenerateSequence, self).__init__()
+ self.start = start
+ self.stop = stop
+ self.elements_per_period = elements_per_period
+ self.max_read_time = max_read_time
+ self.expansion_service = expansion_service
+
+ def expand(self, pbegin):
+ if not isinstance(pbegin, pvalue.PBegin):
+ raise Exception("GenerateSequence must be a root transform")
+
+ coder = VarIntCoder()
+ coder_urn = ['beam:coder:varint:v1']
+ args = {
+ 'start':
+ ConfigValue(
+ coder_urn=coder_urn,
+ payload=coder.encode(self.start))
+ }
+ if self.stop:
+ args['stop'] = ConfigValue(
+ coder_urn=coder_urn,
+ payload=coder.encode(self.stop))
+ if self.elements_per_period:
+ args['elements_per_period'] = ConfigValue(
+ coder_urn=coder_urn,
+ payload=coder.encode(self.elements_per_period))
+ if self.max_read_time:
+ args['max_read_time'] = ConfigValue(
+ coder_urn=coder_urn,
+ payload=coder.encode(self.max_read_time))
+
+ payload = ExternalConfigurationPayload(configuration=args)
+ return pbegin.apply(
+ ExternalTransform(
+ self.URN,
+ payload.SerializeToString(),
+ self.expansion_service))
diff --git a/sdks/python/apache_beam/io/external/kafka.py b/sdks/python/apache_beam/io/external/kafka.py
index f824515..ed24c00 100644
--- a/sdks/python/apache_beam/io/external/kafka.py
+++ b/sdks/python/apache_beam/io/external/kafka.py
@@ -37,25 +37,18 @@
from __future__ import absolute_import
-import typing
-
-from past.builtins import unicode
-
-from apache_beam.transforms.external import ExternalTransform
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
-
-ReadFromKafkaSchema = typing.NamedTuple(
- 'ReadFromKafkaSchema',
- [
- ('consumer_config', typing.List[typing.Tuple[unicode, unicode]]),
- ('topics', typing.List[unicode]),
- ('key_deserializer', unicode),
- ('value_deserializer', unicode),
- ]
-)
-
-
-class ReadFromKafka(ExternalTransform):
+from apache_beam import ExternalTransform
+from apache_beam import pvalue
+from apache_beam.coders import BytesCoder
+from apache_beam.coders import IterableCoder
+from apache_beam.coders import TupleCoder
+from apache_beam.coders.coders import LengthPrefixCoder
+from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
+from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
+from apache_beam.transforms import ptransform
+
+
+class ReadFromKafka(ptransform.PTransform):
"""
An external PTransform which reads from Kafka and returns a KV pair for
each item in the specified Kafka topics. If no Kafka Deserializer for
@@ -71,13 +64,11 @@ class ReadFromKafka(ExternalTransform):
byte_array_deserializer = 'org.apache.kafka.common.serialization.' \
'ByteArrayDeserializer'
- URN = 'beam:external:java:kafka:read:v1'
-
def __init__(self, consumer_config,
topics,
key_deserializer=byte_array_deserializer,
value_deserializer=byte_array_deserializer,
- expansion_service=None):
+ expansion_service='localhost:8097'):
"""
Initializes a read operation from Kafka.
@@ -97,32 +88,38 @@ class ReadFromKafka(ExternalTransform):
serialization.ByteArrayDeserializer'.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
- super(ReadFromKafka, self).__init__(
- self.URN,
- NamedTupleBasedPayloadBuilder(
- ReadFromKafkaSchema(
- consumer_config=list(consumer_config.items()),
- topics=topics,
- key_deserializer=key_deserializer,
- value_deserializer=value_deserializer,
- )
- ),
- expansion_service
- )
-
-
-WriteToKafkaSchema = typing.NamedTuple(
- 'WriteToKafkaSchema',
- [
- ('producer_config', typing.List[typing.Tuple[unicode, unicode]]),
- ('topic', unicode),
- ('key_serializer', unicode),
- ('value_serializer', unicode),
- ]
-)
-
-
-class WriteToKafka(ExternalTransform):
+ super(ReadFromKafka, self).__init__()
+ self._urn = 'beam:external:java:kafka:read:v1'
+ self.consumer_config = consumer_config
+ self.topics = topics
+ self.key_deserializer = key_deserializer
+ self.value_deserializer = value_deserializer
+ self.expansion_service = expansion_service
+
+ def expand(self, pbegin):
+ if not isinstance(pbegin, pvalue.PBegin):
+ raise Exception("ReadFromKafka must be a root transform")
+
+ args = {
+ 'consumer_config':
+ _encode_map(self.consumer_config),
+ 'topics':
+ _encode_list(self.topics),
+ 'key_deserializer':
+ _encode_str(self.key_deserializer),
+ 'value_deserializer':
+ _encode_str(self.value_deserializer),
+ }
+
+ payload = ExternalConfigurationPayload(configuration=args)
+ return pbegin.apply(
+ ExternalTransform(
+ self._urn,
+ payload.SerializeToString(),
+ self.expansion_service))
+
+
+class WriteToKafka(ptransform.PTransform):
"""
An external PTransform which writes KV data to a specified Kafka topic.
If no Kafka Serializer for key/value is provided, then key/value are
@@ -135,13 +132,11 @@ class WriteToKafka(ExternalTransform):
byte_array_serializer = 'org.apache.kafka.common.serialization.' \
'ByteArraySerializer'
- URN = 'beam:external:java:kafka:write:v1'
-
def __init__(self, producer_config,
topic,
key_serializer=byte_array_serializer,
value_serializer=byte_array_serializer,
- expansion_service=None):
+ expansion_service='localhost:8097'):
"""
Initializes a write operation to Kafka.
@@ -161,15 +156,62 @@ class WriteToKafka(ExternalTransform):
serialization.ByteArraySerializer'.
:param expansion_service: The address (host:port) of the ExpansionService.
"""
- super(WriteToKafka, self).__init__(
- self.URN,
- NamedTupleBasedPayloadBuilder(
- WriteToKafkaSchema(
- producer_config=list(producer_config.items()),
- topic=topic,
- key_serializer=key_serializer,
- value_serializer=value_serializer,
- )
- ),
- expansion_service
- )
+ super(WriteToKafka, self).__init__()
+ self._urn = 'beam:external:java:kafka:write:v1'
+ self.producer_config = producer_config
+ self.topic = topic
+ self.key_serializer = key_serializer
+ self.value_serializer = value_serializer
+ self.expansion_service = expansion_service
+
+ def expand(self, pvalue):
+ args = {
+ 'producer_config':
+ _encode_map(self.producer_config),
+ 'topic':
+ _encode_str(self.topic),
+ 'key_serializer':
+ _encode_str(self.key_serializer),
+ 'value_serializer':
+ _encode_str(self.value_serializer),
+ }
+
+ payload = ExternalConfigurationPayload(configuration=args)
+ return pvalue.apply(
+ ExternalTransform(
+ self._urn,
+ payload.SerializeToString(),
+ self.expansion_service))
+
+
+def _encode_map(dict_obj):
+ kv_list = [(key.encode('utf-8'), val.encode('utf-8'))
+ for key, val in dict_obj.items()]
+ coder = IterableCoder(TupleCoder(
+ [LengthPrefixCoder(BytesCoder()), LengthPrefixCoder(BytesCoder())]))
+ coder_urns = ['beam:coder:iterable:v1',
+ 'beam:coder:kv:v1',
+ 'beam:coder:bytes:v1',
+ 'beam:coder:bytes:v1']
+ return ConfigValue(
+ coder_urn=coder_urns,
+ payload=coder.encode(kv_list))
+
+
+def _encode_list(list_obj):
+ encoded_list = [val.encode('utf-8') for val in list_obj]
+ coder = IterableCoder(LengthPrefixCoder(BytesCoder()))
+ coder_urns = ['beam:coder:iterable:v1',
+ 'beam:coder:bytes:v1']
+ return ConfigValue(
+ coder_urn=coder_urns,
+ payload=coder.encode(encoded_list))
+
+
+def _encode_str(str_obj):
+ encoded_str = str_obj.encode('utf-8')
+ coder = LengthPrefixCoder(BytesCoder())
+ coder_urns = ['beam:coder:bytes:v1']
+ return ConfigValue(
+ coder_urn=coder_urns,
+ payload=coder.encode(encoded_str))
diff --git a/sdks/python/apache_beam/transforms/external.py b/sdks/python/apache_beam/transforms/external.py
index fd79fcf..a7e9fb5 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -27,18 +27,11 @@ import copy
import threading
from apache_beam import pvalue
-from apache_beam.coders import registry
from apache_beam.portability import common_urns
from apache_beam.portability.api import beam_expansion_api_pb2
from apache_beam.portability.api import beam_runner_api_pb2
-from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
from apache_beam.runners import pipeline_context
from apache_beam.transforms import ptransform
-from apache_beam.typehints.native_type_compatibility import convert_to_beam_type
-from apache_beam.typehints.trivial_inference import instance_to_type
-from apache_beam.typehints.typehints import Union
-from apache_beam.typehints.typehints import UnionConstraint
# Protect against environments where grpc is not available.
# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
@@ -49,170 +42,6 @@ except ImportError:
grpc = None
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-DEFAULT_EXPANSION_SERVICE = 'localhost:8097'
-
-
-def _is_optional_or_none(typehint):
- return (type(None) in typehint.union_types
- if isinstance(typehint, UnionConstraint) else typehint is type(None))
-
-
-def _strip_optional(typehint):
- if not _is_optional_or_none(typehint):
- return typehint
- new_types = typehint.union_types.difference({type(None)})
- if len(new_types) == 1:
- return list(new_types)[0]
- return Union[new_types]
-
-
-def iter_urns(coder, context=None):
- yield coder.to_runner_api_parameter(context)[0]
- for child in coder._get_component_coders():
- for urn in iter_urns(child, context):
- yield urn
-
-
-class PayloadBuilder(object):
- """
- Abstract base class for building payloads to pass to ExternalTransform.
- """
-
- @classmethod
- def _config_value(cls, obj, typehint):
- """
- Helper to create a ConfigValue with an encoded value.
- """
- coder = registry.get_coder(typehint)
- urns = list(iter_urns(coder))
- if 'beam:coder:pickled_python:v1' in urns:
- raise RuntimeError("Found non-portable coder for %s" % (typehint,))
- return ConfigValue(
- coder_urn=urns,
- payload=coder.get_impl().encode_nested(obj))
-
- def build(self):
- """
- :return: ExternalConfigurationPayload
- """
- raise NotImplementedError
-
- def payload(self):
- """
- The serialized ExternalConfigurationPayload
-
- :return: bytes
- """
- return self.build().SerializeToString()
-
-
-class SchemaBasedPayloadBuilder(PayloadBuilder):
- """
- Base class for building payloads based on a schema that provides
- type information for each configuration value to encode.
-
- Note that if the schema defines a type as Optional, the corresponding value
- will be omitted from the encoded payload, and thus the native transform
- will determine the default.
- """
-
- def __init__(self, values, schema):
- """
- :param values: mapping of config names to values
- :param schema: mapping of config names to types
- """
- self._values = values
- self._schema = schema
-
- @classmethod
- def _encode_config(cls, values, schema):
- result = {}
- for key, value in values.items():
-
- try:
- typehint = schema[key]
- except KeyError:
- raise RuntimeError("No typehint provided for key %r" % key)
-
- typehint = convert_to_beam_type(typehint)
-
- if value is None:
- if not _is_optional_or_none(typehint):
- raise RuntimeError("If value is None, typehint should be "
- "optional. Got %r" % typehint)
- # make it easy for user to filter None by default
- continue
- else:
- # strip Optional from typehint so that pickled_python coder is not used
- # for known types.
- typehint = _strip_optional(typehint)
- result[key] = cls._config_value(value, typehint)
- return result
-
- def build(self):
- """
- :return: ExternalConfigurationPayload
- """
- args = self._encode_config(self._values, self._schema)
- return ExternalConfigurationPayload(configuration=args)
-
-
-class ImplicitSchemaPayloadBuilder(SchemaBasedPayloadBuilder):
- """
- Build a payload that generates a schema from the provided values.
- """
- def __init__(self, values):
- schema = {key: instance_to_type(value) for key, value in values.items()}
- super(ImplicitSchemaPayloadBuilder, self).__init__(values, schema)
-
-
-class NamedTupleBasedPayloadBuilder(SchemaBasedPayloadBuilder):
- """
- Build a payload based on a NamedTuple schema.
- """
- def __init__(self, tuple_instance):
- """
- :param tuple_instance: an instance of a typing.NamedTuple
- """
- super(NamedTupleBasedPayloadBuilder, self).__init__(
- values=tuple_instance._asdict(), schema=tuple_instance._field_types)
-
-
-class AnnotationBasedPayloadBuilder(SchemaBasedPayloadBuilder):
- """
- Build a payload based on an external transform's type annotations.
-
- Supported in python 3 only.
- """
- def __init__(self, transform, **values):
- """
- :param transform: a PTransform instance or class. type annotations will
- be gathered from its __init__ method
- :param values: values to encode
- """
- schema = {k: v for k, v in
- transform.__init__.__annotations__.items()
- if k in values}
- super(AnnotationBasedPayloadBuilder, self).__init__(values, schema)
-
-
-class DataclassBasedPayloadBuilder(SchemaBasedPayloadBuilder):
- """
- Build a payload based on an external transform that uses dataclasses.
-
- Supported in python 3 only.
- """
- def __init__(self, transform):
- """
- :param transform: a dataclass-decorated PTransform instance from which to
- gather type annotations and values
- """
- import dataclasses
- schema = {field.name: field.type for field in
- dataclasses.fields(transform)}
- super(DataclassBasedPayloadBuilder, self).__init__(
- dataclasses.asdict(transform), schema)
-
class ExternalTransform(ptransform.PTransform):
"""
@@ -227,29 +56,15 @@ class ExternalTransform(ptransform.PTransform):
_EXPANDED_TRANSFORM_UNIQUE_NAME = 'root'
_IMPULSE_PREFIX = 'impulse'
- def __init__(self, urn, payload, endpoint=None):
- endpoint = endpoint or DEFAULT_EXPANSION_SERVICE
+ def __init__(self, urn, payload, endpoint):
if grpc is None and isinstance(endpoint, str):
raise NotImplementedError('Grpc required for external transforms.')
# TODO: Start an endpoint given an environment?
self._urn = urn
- self._payload = payload.payload() \
- if isinstance(payload, PayloadBuilder) \
- else payload
+ self._payload = payload
self._endpoint = endpoint
self._namespace = self._fresh_namespace()
- def __post_init__(self, expansion_service):
- """
- This will only be invoked if ExternalTransform is used as a base class
- for a class decorated with dataclasses.dataclass
- """
- ExternalTransform.__init__(
- self,
- self.URN,
- DataclassBasedPayloadBuilder(self),
- expansion_service)
-
def default_label(self):
return '%s(%s)' % (self.__class__.__name__, self._urn)
diff --git a/sdks/python/apache_beam/transforms/external_test.py b/sdks/python/apache_beam/transforms/external_test.py
index 6576419..e480f62 100644
--- a/sdks/python/apache_beam/transforms/external_test.py
+++ b/sdks/python/apache_beam/transforms/external_test.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-"""Unit tests for the transform.external classes."""
+"""Unit tests for the transform.util classes."""
from __future__ import absolute_import
@@ -23,7 +23,6 @@ import argparse
import os
import subprocess
import sys
-import typing
import unittest
import grpc
@@ -33,21 +32,12 @@ from past.builtins import unicode
import apache_beam as beam
from apache_beam import Pipeline
-from apache_beam.coders import FloatCoder
-from apache_beam.coders import IterableCoder
-from apache_beam.coders import StrUtf8Coder
-from apache_beam.coders import TupleCoder
-from apache_beam.coders import VarIntCoder
from apache_beam.options.pipeline_options import PipelineOptions
-from apache_beam.portability.api.external_transforms_pb2 import ConfigValue
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
from apache_beam.runners.portability import expansion_service
from apache_beam.runners.portability.expansion_service_test import FibTransform
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
-from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
-from apache_beam.transforms.external import NamedTupleBasedPayloadBuilder
# Protect against environments where apitools library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
@@ -58,158 +48,6 @@ except ImportError:
# pylint: enable=wrong-import-order, wrong-import-position
-def get_payload(args):
- return ExternalConfigurationPayload(configuration=args)
-
-
-class PayloadBase(object):
- values = {
- 'integer_example': 1,
- 'string_example': u'thing',
- 'list_of_strings': [u'foo', u'bar'],
- 'optional_kv': (u'key', 1.1),
- 'optional_integer': None,
- }
-
- bytes_values = {
- 'integer_example': 1,
- 'string_example': 'thing',
- 'list_of_strings': ['foo', 'bar'],
- 'optional_kv': ('key', 1.1),
- 'optional_integer': None,
- }
-
- args = {
- 'integer_example': ConfigValue(
- coder_urn=['beam:coder:varint:v1'],
- payload=VarIntCoder()
- .get_impl().encode_nested(values['integer_example'])),
- 'string_example': ConfigValue(
- coder_urn=['beam:coder:string_utf8:v1'],
- payload=StrUtf8Coder()
- .get_impl().encode_nested(values['string_example'])),
- 'list_of_strings': ConfigValue(
- coder_urn=['beam:coder:iterable:v1',
- 'beam:coder:string_utf8:v1'],
- payload=IterableCoder(StrUtf8Coder())
- .get_impl().encode_nested(values['list_of_strings'])),
- 'optional_kv': ConfigValue(
- coder_urn=['beam:coder:kv:v1',
- 'beam:coder:string_utf8:v1',
- 'beam:coder:double:v1'],
- payload=TupleCoder([StrUtf8Coder(), FloatCoder()])
- .get_impl().encode_nested(values['optional_kv'])),
- }
-
- def get_payload_from_typing_hints(self, values):
- """Return ExternalConfigurationPayload based on python typing hints"""
- raise NotImplementedError
-
- def get_payload_from_beam_typehints(self, values):
- """Return ExternalConfigurationPayload based on beam typehints"""
- raise NotImplementedError
-
- def test_typing_payload_builder(self):
- result = self.get_payload_from_typing_hints(self.values)
- expected = get_payload(self.args)
- self.assertEqual(result, expected)
-
- def test_typing_payload_builder_with_bytes(self):
- """
- string_utf8 coder will be used even if values are not unicode in python 2.x
- """
- result = self.get_payload_from_typing_hints(self.bytes_values)
- expected = get_payload(self.args)
- self.assertEqual(result, expected)
-
- def test_typehints_payload_builder(self):
- result = self.get_payload_from_beam_typehints(self.values)
- expected = get_payload(self.args)
- self.assertEqual(result, expected)
-
- def test_typehints_payload_builder_with_bytes(self):
- """
- string_utf8 coder will be used even if values are not unicode in python 2.x
- """
- result = self.get_payload_from_beam_typehints(self.bytes_values)
- expected = get_payload(self.args)
- self.assertEqual(result, expected)
-
- def test_optional_error(self):
- """
- value can only be None if typehint is Optional
- """
- with self.assertRaises(RuntimeError):
- self.get_payload_from_typing_hints({k: None for k in self.values})
-
-
-class ExternalTuplePayloadTest(PayloadBase, unittest.TestCase):
-
- def get_payload_from_typing_hints(self, values):
- TestSchema = typing.NamedTuple(
- 'TestSchema',
- [
- ('integer_example', int),
- ('string_example', unicode),
- ('list_of_strings', typing.List[unicode]),
- ('optional_kv', typing.Optional[typing.Tuple[unicode, float]]),
- ('optional_integer', typing.Optional[int]),
- ]
- )
-
- builder = NamedTupleBasedPayloadBuilder(TestSchema(**values))
- return builder.build()
-
- def get_payload_from_beam_typehints(self, values):
- raise unittest.SkipTest("Beam typehints cannot be used with "
- "typing.NamedTuple")
-
-
-class ExternalImplicitPayloadTest(unittest.TestCase):
- """
- ImplicitSchemaPayloadBuilder works very differently than the other payload
- builders
- """
- def test_implicit_payload_builder(self):
- builder = ImplicitSchemaPayloadBuilder(PayloadBase.values)
- result = builder.build()
- expected = get_payload(PayloadBase.args)
- self.assertEqual(result, expected)
-
- def test_implicit_payload_builder_with_bytes(self):
- values = PayloadBase.bytes_values
- builder = ImplicitSchemaPayloadBuilder(values)
- result = builder.build()
- if sys.version_info[0] < 3:
- # in python 2.x bytes coder will be inferred
- args = {
- 'integer_example': ConfigValue(
- coder_urn=['beam:coder:varint:v1'],
- payload=VarIntCoder()
- .get_impl().encode_nested(values['integer_example'])),
- 'string_example': ConfigValue(
- coder_urn=['beam:coder:bytes:v1'],
- payload=StrUtf8Coder()
- .get_impl().encode_nested(values['string_example'])),
- 'list_of_strings': ConfigValue(
- coder_urn=['beam:coder:iterable:v1',
- 'beam:coder:bytes:v1'],
- payload=IterableCoder(StrUtf8Coder())
- .get_impl().encode_nested(values['list_of_strings'])),
- 'optional_kv': ConfigValue(
- coder_urn=['beam:coder:kv:v1',
- 'beam:coder:bytes:v1',
- 'beam:coder:double:v1'],
- payload=TupleCoder([StrUtf8Coder(), FloatCoder()])
- .get_impl().encode_nested(values['optional_kv'])),
- }
- expected = get_payload(args)
- self.assertEqual(result, expected)
- else:
- expected = get_payload(PayloadBase.args)
- self.assertEqual(result, expected)
-
-
@attr('UsesCrossLanguageTransforms')
class ExternalTransformTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/transforms/external_test_py3.py b/sdks/python/apache_beam/transforms/external_test_py3.py
deleted file mode 100644
index 88fa870..0000000
--- a/sdks/python/apache_beam/transforms/external_test_py3.py
+++ /dev/null
@@ -1,93 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.external classes."""
-
-from __future__ import absolute_import
-
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
-from apache_beam.transforms.external import AnnotationBasedPayloadBuilder
-from apache_beam.transforms.external_test import PayloadBase
-
-
-def get_payload(cls):
- payload = ExternalConfigurationPayload()
- payload.ParseFromString(cls._payload)
- return payload
-
-
-class ExternalAnnotationPayloadTest(PayloadBase, unittest.TestCase):
-
- def get_payload_from_typing_hints(self, values):
- class AnnotatedTransform(beam.ExternalTransform):
- URN = 'beam:external:fakeurn:v1'
-
- def __init__(self,
- integer_example: int,
- string_example: str,
- list_of_strings: typing.List[str],
- optional_kv: typing.Optional[typing.Tuple[str, float]] = None,
- optional_integer: typing.Optional[int] = None,
- expansion_service=None):
- super(AnnotatedTransform, self).__init__(
- self.URN,
- AnnotationBasedPayloadBuilder(
- self,
- integer_example=integer_example,
- string_example=string_example,
- list_of_strings=list_of_strings,
- optional_kv=optional_kv,
- optional_integer=optional_integer,
- ),
- expansion_service
- )
-
- return get_payload(AnnotatedTransform(**values))
-
- def get_payload_from_beam_typehints(self, values):
- class AnnotatedTransform(beam.ExternalTransform):
- URN = 'beam:external:fakeurn:v1'
-
- def __init__(self,
- integer_example: int,
- string_example: str,
- list_of_strings: typehints.List[str],
- optional_kv: typehints.Optional[typehints.KV[str, float]] = None,
- optional_integer: typehints.Optional[int] = None,
- expansion_service=None):
- super(AnnotatedTransform, self).__init__(
- self.URN,
- AnnotationBasedPayloadBuilder(
- self,
- integer_example=integer_example,
- string_example=string_example,
- list_of_strings=list_of_strings,
- optional_kv=optional_kv,
- optional_integer=optional_integer,
- ),
- expansion_service
- )
-
- return get_payload(AnnotatedTransform(**values))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/transforms/external_test_py37.py b/sdks/python/apache_beam/transforms/external_test_py37.py
deleted file mode 100644
index ad1ff72..0000000
--- a/sdks/python/apache_beam/transforms/external_test_py37.py
+++ /dev/null
@@ -1,71 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the transform.external classes."""
-
-from __future__ import absolute_import
-
-import dataclasses
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigurationPayload
-from apache_beam.transforms.external_test import PayloadBase
-
-
-def get_payload(cls):
- payload = ExternalConfigurationPayload()
- payload.ParseFromString(cls._payload)
- return payload
-
-
-class ExternalDataclassesPayloadTest(PayloadBase, unittest.TestCase):
-
- def get_payload_from_typing_hints(self, values):
-
- @dataclasses.dataclass
- class DataclassTransform(beam.ExternalTransform):
- URN = 'beam:external:fakeurn:v1'
-
- integer_example: int
- string_example: str
- list_of_strings: typing.List[str]
- optional_kv: typing.Optional[typing.Tuple[str, float]] = None
- optional_integer: typing.Optional[int] = None
- expansion_service: dataclasses.InitVar[typing.Optional[str]] = None
-
- return get_payload(DataclassTransform(**values))
-
- def get_payload_from_beam_typehints(self, values):
-
- @dataclasses.dataclass
- class DataclassTransform(beam.ExternalTransform):
- URN = 'beam:external:fakeurn:v1'
-
- integer_example: int
- string_example: str
- list_of_strings: typehints.List[str]
- optional_kv: typehints.Optional[typehints.KV[str, float]] = None
- optional_integer: typehints.Optional[int] = None
- expansion_service: dataclasses.InitVar[typehints.Optional[str]] = None
-
- return get_payload(DataclassTransform(**values))
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/scripts/generate_pydoc.sh b/sdks/python/scripts/generate_pydoc.sh
index 4a29b7a..d74dc62 100755
--- a/sdks/python/scripts/generate_pydoc.sh
+++ b/sdks/python/scripts/generate_pydoc.sh
@@ -77,7 +77,7 @@ excluded_patterns=(
*_test.py
*_test_common.py
# TODO(BEAM-7847): Remove this once doc generation can parse Py3 syntax.
- *_py3*.py
+ *py3.py
)
python $(type -p sphinx-apidoc) -fMeT -o target/docs/source apache_beam \
diff --git a/sdks/python/scripts/run_mini_py3lint.sh b/sdks/python/scripts/run_mini_py3lint.sh
index 0bd7e0e..27ca3ce 100755
--- a/sdks/python/scripts/run_mini_py3lint.sh
+++ b/sdks/python/scripts/run_mini_py3lint.sh
@@ -38,24 +38,6 @@ set -o pipefail
MODULE=apache_beam
-PYTHON_MINOR=$(python -c 'import sys; print(sys.version_info[1])')
-if [[ "${PYTHON_MINOR}" == 5 ]]; then
- EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[6-9]\.py$')
- echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}"
-else
- EXCLUDED_PY3_FILES=""
-fi
-
-FILES_TO_IGNORE=""
-for file in ${EXCLUDED_PY3_FILES}; do
- if test -z "$FILES_TO_IGNORE"
- then FILES_TO_IGNORE="$(basename $file)"
- else FILES_TO_IGNORE="$FILES_TO_IGNORE, $(basename $file)"
- fi
-done
-
-echo -e "Skipping lint for files:\n${FILES_TO_IGNORE}"
-
usage(){ echo "Usage: $0 [MODULE|--help] # The default MODULE is $MODULE"; }
if test $# -gt 0; then
@@ -66,5 +48,4 @@ if test $# -gt 0; then
fi
echo "Running flake8 for module $MODULE:"
-flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics \
- --exclude="${FILES_TO_IGNORE}"
+flake8 $MODULE --count --select=E9,F821,F822,F823 --show-source --statistics
diff --git a/sdks/python/scripts/run_pylint.sh b/sdks/python/scripts/run_pylint.sh
index 27de9a3..608cd96 100755
--- a/sdks/python/scripts/run_pylint.sh
+++ b/sdks/python/scripts/run_pylint.sh
@@ -62,7 +62,7 @@ apache_beam/portability/api/*pb2*.py
PYTHON_MAJOR=$(python -c 'import sys; print(sys.version_info[0])')
if [[ "${PYTHON_MAJOR}" == 2 ]]; then
- EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3[0-9]*\.py$')
+ EXCLUDED_PY3_FILES=$(find ${MODULE} | grep 'py3\.py$')
echo -e "Excluding Py3 files:\n${EXCLUDED_PY3_FILES}"
else
EXCLUDED_PY3_FILES=""
diff --git a/sdks/python/setup.cfg b/sdks/python/setup.cfg
index 361fe14..69a5187 100644
--- a/sdks/python/setup.cfg
+++ b/sdks/python/setup.cfg
@@ -51,6 +51,3 @@ exclude_lines =
[coverage:xml]
output = target/site/cobertura/coverage.xml
-
-[isort]
-known_standard_library = dataclasses
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 6fe8ecf..f792bbf 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -54,28 +54,28 @@ commands_post =
[testenv:py27]
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+ python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
[testenv:py35]
setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py36]
setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py37]
setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py27-cython]
# cython tests are only expected to work in linux (2.x and 3.x)
@@ -85,7 +85,7 @@ commands =
platform = linux2
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+ python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
[testenv:py35-cython]
# cython tests are only expected to work in linux (2.x and 3.x)
@@ -97,7 +97,7 @@ setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[5-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py36-cython]
# cython tests are only expected to work in linux (2.x and 3.x)
@@ -109,7 +109,7 @@ setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[7-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py37-cython]
# cython tests are only expected to work in linux (2.x and 3.x)
@@ -121,13 +121,13 @@ setenv =
RUN_SKIPPED_PY3_TESTS=0
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py27-gcp]
extras = test,gcp
commands =
python apache_beam/examples/complete/autocomplete_test.py
- python setup.py nosetests --ignore-files '.*py3\d?\.py$' {posargs}
+ python setup.py nosetests --ignore-files '.*py3.py$' {posargs}
# Old and new Datastore client unit tests cannot be run in the same process
# due to conflicting protobuf modules.
# TODO(BEAM-4543): Remove these separate nosetests invocations once the
@@ -140,14 +140,14 @@ setenv =
RUN_SKIPPED_PY3_TESTS=0
extras = test,gcp
commands =
- python setup.py nosetests --ignore-files '.*py3[6-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py37-gcp]
setenv =
RUN_SKIPPED_PY3_TESTS=0
extras = test,gcp
commands =
- python setup.py nosetests --ignore-files '.*py3[8-9]\.py$' {posargs}
+ python setup.py nosetests {posargs}
[testenv:py27-lint]
deps =