You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ud...@apache.org on 2021/08/31 17:38:35 UTC
[beam] branch release-2.33.0 updated: Cherry-picking #15402.
This is an automated email from the ASF dual-hosted git repository.
udim pushed a commit to branch release-2.33.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.33.0 by this push:
new 96b0c00 Cherry-picking #15402.
new f1c8722 Merge pull request #15408 from vachan-shetty/cherrypick-15402
96b0c00 is described below
commit 96b0c0058774ee3df1aaaa9abd011a36ce4681ff
Author: vachan-shetty <va...@google.com>
AuthorDate: Fri Aug 27 10:44:10 2021 -0400
Cherry-picking #15402.
---
sdks/python/apache_beam/io/gcp/bigquery.py | 340 +--------------------
.../apache_beam/io/gcp/bigquery_read_it_test.py | 159 ----------
sdks/python/setup.py | 31 +-
3 files changed, 26 insertions(+), 504 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4e5f639..88093b1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -270,7 +270,6 @@ encoding when writing to BigQuery.
# pytype: skip-file
import collections
-import io
import itertools
import json
import logging
@@ -278,20 +277,13 @@ import random
import time
import uuid
from typing import Dict
-from typing import List
-from typing import Optional
from typing import Union
-import avro.schema
-import fastavro
-from avro import io as avroio
-
import apache_beam as beam
from apache_beam import coders
from apache_beam import pvalue
from apache_beam.internal.gcp.json_value import from_json_value
from apache_beam.internal.gcp.json_value import to_json_value
-from apache_beam.io import range_trackers
from apache_beam.io.avroio import _create_avro_source as create_avro_source
from apache_beam.io.filesystems import CompressionTypes
from apache_beam.io.filesystems import FileSystems
@@ -332,7 +324,6 @@ from apache_beam.utils.annotations import experimental
try:
from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
from apache_beam.io.gcp.internal.clients.bigquery import TableReference
- import google.cloud.bigquery_storage_v1 as bq_storage
except ImportError:
DatasetReference = None
TableReference = None
@@ -896,276 +887,6 @@ class _CustomBigQuerySource(BoundedSource):
return table.schema, metadata_list
-class _CustomBigQueryStorageSourceBase(BoundedSource):
- """A base class for BoundedSource implementations which read from BigQuery
- using the BigQuery Storage API.
-
- Args:
- table (str, TableReference): The ID of the table. The ID must contain only
- letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_`` If
- **dataset** argument is :data:`None` then the table argument must
- contain the entire table reference specified as:
- ``'PROJECT:DATASET.TABLE'`` or must specify a TableReference.
- dataset (str): Optional ID of the dataset containing this table or
- :data:`None` if the table argument specifies a TableReference.
- project (str): Optional ID of the project containing this table or
- :data:`None` if the table argument specifies a TableReference.
- selected_fields (List[str]): Optional List of names of the fields in the
- table that should be read. If empty, all fields will be read. If the
- specified field is a nested field, all the sub-fields in the field will be
- selected. The output field order is unrelated to the order of fields in
- selected_fields.
- row_restriction (str): Optional SQL text filtering statement, similar to a
- WHERE clause in a query. Aggregates are not supported. Restricted to a
- maximum length for 1 MB.
- """
-
- # The maximum number of streams which will be requested when creating a read
- # session, regardless of the desired bundle size.
- MAX_SPLIT_COUNT = 10000
- # The minimum number of streams which will be requested when creating a read
- # session, regardless of the desired bundle size. Note that the server may
- # still choose to return fewer than ten streams based on the layout of the
- # table.
- MIN_SPLIT_COUNT = 10
-
- def __init__(
- self,
- table: Union[str, TableReference],
- dataset: Optional[str] = None,
- project: Optional[str] = None,
- selected_fields: Optional[List[str]] = None,
- row_restriction: Optional[str] = None,
- use_fastavro_for_direct_read: Optional[bool] = None,
- pipeline_options: Optional[GoogleCloudOptions] = None):
-
- self.table_reference = bigquery_tools.parse_table_reference(
- table, dataset, project)
- self.table = self.table_reference.tableId
- self.dataset = self.table_reference.datasetId
- self.project = self.table_reference.projectId
- self.selected_fields = selected_fields
- self.row_restriction = row_restriction
- self.use_fastavro = \
- True if use_fastavro_for_direct_read is None else \
- use_fastavro_for_direct_read
- self.pipeline_options = pipeline_options
- self.split_result = None
-
- def _get_parent_project(self):
- """Returns the project that will be billed."""
- project = self.pipeline_options.view_as(GoogleCloudOptions).project
- if isinstance(project, vp.ValueProvider):
- project = project.get()
- if not project:
- project = self.project
- return project
-
- def _get_table_size(self, table, dataset, project):
- if project is None:
- project = self._get_parent_project()
-
- bq = bigquery_tools.BigQueryWrapper()
- table = bq.get_table(project, dataset, table)
- return table.numBytes
-
- def display_data(self):
- return {
- 'project': str(self.project),
- 'dataset': str(self.dataset),
- 'table': str(self.table),
- 'selected_fields': str(self.selected_fields),
- 'row_restriction': str(self.row_restriction),
- 'use_fastavro': str(self.use_fastavro)
- }
-
- def estimate_size(self):
- # Returns the pre-filtering size of the table being read.
- return self._get_table_size(self.table, self.dataset, self.project)
-
- def split(self, desired_bundle_size, start_position=None, stop_position=None):
- requested_session = bq_storage.types.ReadSession()
- requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
- self.project, self.dataset, self.table)
- requested_session.data_format = bq_storage.types.DataFormat.AVRO
- if self.selected_fields is not None:
- requested_session.read_options.selected_fields = self.selected_fields
- if self.row_restriction is not None:
- requested_session.read_options.row_restriction = self.row_restriction
-
- storage_client = bq_storage.BigQueryReadClient()
- stream_count = 0
- if desired_bundle_size > 0:
- table_size = self._get_table_size(self.table, self.dataset, self.project)
- stream_count = min(
- int(table_size / desired_bundle_size),
- _CustomBigQueryStorageSourceBase.MAX_SPLIT_COUNT)
- stream_count = max(
- stream_count, _CustomBigQueryStorageSourceBase.MIN_SPLIT_COUNT)
-
- parent = 'projects/{}'.format(self.project)
- read_session = storage_client.create_read_session(
- parent=parent,
- read_session=requested_session,
- max_stream_count=stream_count)
- _LOGGER.info(
- 'Sent BigQuery Storage API CreateReadSession request: \n %s \n'
- 'Received response \n %s.',
- requested_session,
- read_session)
-
- self.split_result = [
- _CustomBigQueryStorageStreamSource(stream.name, self.use_fastavro)
- for stream in read_session.streams
- ]
-
- for source in self.split_result:
- yield SourceBundle(
- weight=1.0, source=source, start_position=None, stop_position=None)
-
- def get_range_tracker(self, start_position, stop_position):
- class NonePositionRangeTracker(RangeTracker):
- """A RangeTracker that always returns positions as None. Prevents the
- BigQuery Storage source from being read() before being split()."""
- def start_position(self):
- return None
-
- def stop_position(self):
- return None
-
- return NonePositionRangeTracker()
-
- def read(self, range_tracker):
- raise NotImplementedError(
- 'BigQuery storage source must be split before being read')
-
-
-class _CustomBigQueryStorageStreamSource(BoundedSource):
- """A source representing a single stream in a read session."""
- def __init__(self, read_stream_name: str, use_fastavro: bool):
- self.read_stream_name = read_stream_name
- self.use_fastavro = use_fastavro
-
- def display_data(self):
- return {
- 'read_stream': str(self.read_stream_name),
- }
-
- def estimate_size(self):
- # The size of stream source cannot be estimate due to server-side liquid
- # sharding.
- # TODO: Implement progress reporting.
- return None
-
- def split(self, desired_bundle_size, start_position=None, stop_position=None):
- # A stream source can't be split without reading from it due to
- # server-side liquid sharding. A split will simply return the current source
- # for now.
- return SourceBundle(
- weight=1.0,
- source=_CustomBigQueryStorageStreamSource(
- self.read_stream_name, self.use_fastavro),
- start_position=None,
- stop_position=None)
-
- def get_range_tracker(self, start_position, stop_position):
- # TODO: Implement dynamic work rebalancing.
- assert start_position is None
- # Defaulting to the start of the stream.
- start_position = 0
- # Since the streams are unsplittable we choose OFFSET_INFINITY as the
- # default end offset so that all data of the source gets read.
- stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
- range_tracker = range_trackers.OffsetRangeTracker(
- start_position, stop_position)
- # Ensuring that all try_split() calls will be ignored by the Rangetracker.
- range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
-
- return range_tracker
-
- def read(self, range_tracker):
- _LOGGER.info(
- "Started BigQuery Storage API read from stream %s.",
- self.read_stream_name)
- storage_client = bq_storage.BigQueryReadClient()
- read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name))
- # Handling the case where the user might provide very selective filters
- # which can result in read_rows_response being empty.
- first_read_rows_response = next(read_rows_iterator, None)
- if first_read_rows_response is None:
- return iter([])
-
- if self.use_fastavro:
- row_reader = _ReadRowsResponseReaderWithFastAvro(
- read_rows_iterator, first_read_rows_response)
- return iter(row_reader)
-
- row_reader = _ReadRowsResponseReader(
- read_rows_iterator, first_read_rows_response)
- return iter(row_reader)
-
-
-class _ReadRowsResponseReaderWithFastAvro():
- """An iterator that deserializes ReadRowsResponses using the fastavro
- library."""
- def __init__(self, read_rows_iterator, read_rows_response):
- self.read_rows_iterator = read_rows_iterator
- self.read_rows_response = read_rows_response
- self.avro_schema = fastavro.parse_schema(
- json.loads(self.read_rows_response.avro_schema.schema))
- self.bytes_reader = io.BytesIO(
- self.read_rows_response.avro_rows.serialized_binary_rows)
-
- def __iter__(self):
- return self
-
- def __next__(self):
- try:
- return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
- except StopIteration:
- self.read_rows_response = next(self.read_rows_iterator, None)
- if self.read_rows_response is not None:
- self.bytes_reader = io.BytesIO(
- self.read_rows_response.avro_rows.serialized_binary_rows)
- return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
- else:
- raise StopIteration
-
-
-class _ReadRowsResponseReader():
- """An iterator that deserializes ReadRowsResponses."""
- def __init__(self, read_rows_iterator, read_rows_response):
- self.read_rows_iterator = read_rows_iterator
- self.read_rows_response = read_rows_response
- self.avro_schema = avro.schema.Parse(
- self.read_rows_response.avro_schema.schema)
- self.reader = avroio.DatumReader(self.avro_schema)
- self.decoder = avroio.BinaryDecoder(
- io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
- self.next_row = 0
-
- def __iter__(self):
- return self
-
- def get_deserialized_row(self):
- deserialized_row = self.reader.read(self.decoder)
- self.next_row += 1
- return deserialized_row
-
- def __next__(self):
- if self.next_row < self.read_rows_response.row_count:
- return self.get_deserialized_row()
-
- self.read_rows_response = next(self.read_rows_iterator, None)
- if self.read_rows_response is not None:
- self.decoder = avroio.BinaryDecoder(
- io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
- self.next_row = 0
- return self.get_deserialized_row()
- else:
- raise StopIteration
-
-
@deprecated(since='2.11.0', current="WriteToBigQuery")
class BigQuerySink(dataflow_io.NativeSink):
"""A sink based on a BigQuery table.
@@ -2120,38 +1841,19 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
class ReadFromBigQuery(PTransform):
"""Read data from BigQuery.
- This PTransform uses either a BigQuery export job to take a snapshot of the
- table on GCS, and then reads from each produced file (EXPORT) or reads
- directly from BigQuery storage using BigQuery Read API (DIRECT_READ). The
- option is specified using the 'method' :parameter. File format is Avro by
+ This PTransform uses a BigQuery export job to take a snapshot of the table
+ on GCS, and then reads from each produced file. File format is Avro by
default.
- NOTE: DIRECT_READ only supports reading from BigQuery Tables currently. To
- read the results of a query please use EXPORT.
-
- .. warning::
- DATETIME columns are parsed as strings in the fastavro library. As a
- result, such columns will be converted to Python strings instead of native
- Python DATETIME types.
-
Args:
- method: The method to use to read from BigQuery. It may be EXPORT or
- DIRECT_READ. EXPORT invokes a BigQuery export request
- (https://cloud.google.com/bigquery/docs/exporting-data). DIRECT_READ reads
- directly from BigQuery storage using the BigQuery Read API
- (https://cloud.google.com/bigquery/docs/reference/storage). If
- unspecified, the default is currently EXPORT.
- use_fastavro_for_direct_read (bool): If method is `DIRECT_READ` and
- :data:`True`, the fastavro library is used to deserialize the data
- received from the BigQuery Read API. The default here is :data:`True`.
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or underscores ``_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
- reference specified as: ``'PROJECT:DATASET.TABLE'``.
- If it's a callable, it must receive one argument representing an element
- to be written to BigQuery, and return a TableReference, or a string table
- name as specified above.
+ reference specified as: ``'DATASET.TABLE'``
+ or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
+ argument representing an element to be written to BigQuery, and return
+ a TableReference, or a string table name as specified above.
dataset (str): The ID of the dataset containing this table or
:data:`None` if the table reference is specified entirely by the table
argument.
@@ -2212,22 +1914,16 @@ class ReadFromBigQuery(PTransform):
to create and delete tables within the given dataset. Dataset name
should *not* start with the reserved prefix `beam_temp_dataset_`.
"""
- class Method(object):
- EXPORT = 'EXPORT' # This is currently the default.
- DIRECT_READ = 'DIRECT_READ'
COUNTER = 0
- def __init__(self, gcs_location=None, method=None, *args, **kwargs):
- self.method = method or ReadFromBigQuery.Method.EXPORT
-
- if gcs_location and self.method is ReadFromBigQuery.Method.EXPORT:
+ def __init__(self, gcs_location=None, *args, **kwargs):
+ if gcs_location:
if not isinstance(gcs_location, (str, ValueProvider)):
raise TypeError(
'%s: gcs_location must be of type string'
' or ValueProvider; got %r instead' %
(self.__class__.__name__, type(gcs_location)))
-
if isinstance(gcs_location, str):
gcs_location = StaticValueProvider(str, gcs_location)
@@ -2236,17 +1932,6 @@ class ReadFromBigQuery(PTransform):
self._kwargs = kwargs
def expand(self, pcoll):
- # TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation.
- if self.method is ReadFromBigQuery.Method.EXPORT:
- return self._expand_export(pcoll)
- elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
- return self._expand_direct_read(pcoll)
- else:
- raise ValueError(
- 'The method to read from BigQuery must be either EXPORT'
- 'or DIRECT_READ.')
-
- def _expand_export(self, pcoll):
temp_location = pcoll.pipeline.options.view_as(
GoogleCloudOptions).temp_location
job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
@@ -2281,15 +1966,6 @@ class ReadFromBigQuery(PTransform):
**self._kwargs))
| _PassThroughThenCleanup(files_to_remove_pcoll))
- def _expand_direct_read(self, pcoll):
- return (
- pcoll
- | beam.io.Read(
- _CustomBigQueryStorageSourceBase(
- pipeline_options=pcoll.pipeline.options,
- *self._args,
- **self._kwargs)))
-
class ReadFromBigQueryRequest:
"""
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 53bf567..9216a9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -26,14 +26,12 @@ import logging
import random
import time
import unittest
-import uuid
from decimal import Decimal
from functools import wraps
import pytest
import apache_beam as beam
-from apache_beam.io.gcp import bigquery_tools
from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import StaticValueProvider
@@ -177,163 +175,6 @@ class ReadTests(BigQueryReadIntegrationTests):
assert_that(result, equal_to(self.TABLE_DATA))
-class ReadUsingStorageApiTests(BigQueryReadIntegrationTests):
- TABLE_DATA = [{
- 'number': 1, 'str': 'abc'
- }, {
- 'number': 2, 'str': 'def'
- }, {
- 'number': 3, 'str': u'你好'
- }, {
- 'number': 4, 'str': u'привет'
- }]
-
- @classmethod
- def setUpClass(cls):
- super(ReadUsingStorageApiTests, cls).setUpClass()
- cls.table_name = 'python_read_table'
- cls._create_table(cls.table_name)
-
- table_id = '{}.{}'.format(cls.dataset_id, cls.table_name)
- cls.query = 'SELECT number, str FROM `%s`' % table_id
-
- # Materializing the newly created Table to ensure the Read API can stream.
- cls.temp_table_reference = cls._execute_query(cls.project, cls.query)
-
- @classmethod
- def tearDownClass(cls):
- cls.bigquery_client.clean_up_temporary_dataset(cls.project)
- super(ReadUsingStorageApiTests, cls).tearDownClass()
-
- @classmethod
- def _create_table(cls, table_name):
- table_schema = bigquery.TableSchema()
- table_field = bigquery.TableFieldSchema()
- table_field.name = 'number'
- table_field.type = 'INTEGER'
- table_schema.fields.append(table_field)
- table_field = bigquery.TableFieldSchema()
- table_field.name = 'str'
- table_field.type = 'STRING'
- table_schema.fields.append(table_field)
- table = bigquery.Table(
- tableReference=bigquery.TableReference(
- projectId=cls.project, datasetId=cls.dataset_id,
- tableId=table_name),
- schema=table_schema)
- request = bigquery.BigqueryTablesInsertRequest(
- projectId=cls.project, datasetId=cls.dataset_id, table=table)
- cls.bigquery_client.client.tables.Insert(request)
- cls.bigquery_client.insert_rows(
- cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
-
- @classmethod
- def _setup_temporary_dataset(cls, project, query):
- location = cls.bigquery_client.get_query_location(project, query, False)
- cls.bigquery_client.create_temporary_dataset(project, location)
-
- @classmethod
- def _execute_query(cls, project, query):
- query_job_name = bigquery_tools.generate_bq_job_name(
- 'materializing_table_before_reading',
- str(uuid.uuid4())[0:10],
- bigquery_tools.BigQueryJobTypes.QUERY,
- '%s_%s' % (int(time.time()), random.randint(0, 1000)))
- cls._setup_temporary_dataset(cls.project, cls.query)
- job = cls.bigquery_client._start_query_job(
- project,
- query,
- use_legacy_sql=False,
- flatten_results=False,
- job_id=query_job_name)
- job_ref = job.jobReference
- cls.bigquery_client.wait_for_bq_job(job_ref, max_retries=0)
- return cls.bigquery_client._get_temp_table(project)
-
- def test_iobase_source(self):
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- use_fastavro_for_direct_read=True))
- assert_that(result, equal_to(self.TABLE_DATA))
-
- def test_iobase_source_without_fastavro(self):
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- use_fastavro_for_direct_read=False))
- assert_that(result, equal_to(self.TABLE_DATA))
-
- def test_iobase_source_with_column_selection(self):
- EXPECTED_TABLE_DATA = [{
- 'number': 1
- }, {
- 'number': 2
- }, {
- 'number': 3
- }, {
- 'number': 4
- }]
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- selected_fields=['number']))
- assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
- def test_iobase_source_with_row_restriction(self):
- EXPECTED_TABLE_DATA = [{
- 'number': 3, 'str': u'你好'
- }, {
- 'number': 4, 'str': u'привет'
- }]
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- row_restriction='number > 2'))
- assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
- def test_iobase_source_with_column_selection_and_row_restriction(self):
- EXPECTED_TABLE_DATA = [{'str': u'你好'}, {'str': u'привет'}]
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- selected_fields=['str'],
- row_restriction='number > 2'))
- assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
- def test_iobase_source_with_very_selective_filters(self):
- with beam.Pipeline(argv=self.args) as p:
- result = (
- p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
- method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
- project=self.temp_table_reference.projectId,
- dataset=self.temp_table_reference.datasetId,
- table=self.temp_table_reference.tableId,
- selected_fields=['str'],
- row_restriction='number > 4'))
- assert_that(result, equal_to([]))
-
-
class ReadNewTypesTests(BigQueryReadIntegrationTests):
@classmethod
def setUpClass(cls):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index f4e02b8..170fbff 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -103,7 +103,6 @@ if StrictVersion(_PIP_VERSION) < StrictVersion(REQUIRED_PIP_VERSION):
)
)
-
REQUIRED_CYTHON_VERSION = '0.28.1'
try:
_CYTHON_VERSION = get_distribution('cython').version
@@ -155,7 +154,7 @@ REQUIRED_PACKAGES = [
'pytz>=2018.3',
'requests>=2.24.0,<3.0.0',
'typing-extensions>=3.7.0,<4',
- ]
+]
# [BEAM-8181] pyarrow cannot be installed on 32-bit Windows platforms.
if sys.platform == 'win32' and sys.maxsize <= 2**32:
@@ -178,7 +177,7 @@ REQUIRED_TEST_PACKAGES = [
'sqlalchemy>=1.3,<2.0',
'psycopg2-binary>=2.8.5,<3.0.0',
'testcontainers>=3.0.3,<4.0.0',
- ]
+]
GCP_REQUIREMENTS = [
'cachetools>=3.1.0,<5',
@@ -186,7 +185,6 @@ GCP_REQUIREMENTS = [
'google-auth>=1.18.0,<2',
'google-cloud-datastore>=1.8.0,<2',
'google-cloud-pubsub>=0.39.0,<2',
- 'google-cloud-bigquery-storage>=2.4.0',
# GCP packages required by tests
'google-cloud-bigquery>=1.6.0,<3',
'google-cloud-core>=0.28.1,<2',
@@ -223,9 +221,7 @@ INTERACTIVE_BEAM_TEST = [
'pillow>=7.1.1,<8',
]
-AWS_REQUIREMENTS = [
- 'boto3 >=1.9'
-]
+AWS_REQUIREMENTS = ['boto3 >=1.9']
AZURE_REQUIREMENTS = [
'azure-storage-blob >=12.3.2',
@@ -233,7 +229,6 @@ AZURE_REQUIREMENTS = [
]
-
# We must generate protos after setup_requires are installed.
def generate_protos_first(original_cmd):
try:
@@ -245,6 +240,7 @@ def generate_protos_first(original_cmd):
def run(self):
gen_protos.generate_proto_files()
super(cmd, self).run()
+
return cmd
except ImportError:
warnings.warn("Could not import gen_protos, skipping proto generation.")
@@ -256,8 +252,8 @@ python_requires = '>=3.6'
if sys.version_info.major == 3 and sys.version_info.minor >= 9:
warnings.warn(
'This version of Apache Beam has not been sufficiently tested on '
- 'Python %s.%s. You may encounter bugs or missing features.' % (
- sys.version_info.major, sys.version_info.minor))
+ 'Python %s.%s. You may encounter bugs or missing features.' %
+ (sys.version_info.major, sys.version_info.minor))
setuptools.setup(
name=PACKAGE_NAME,
@@ -269,9 +265,18 @@ setuptools.setup(
author=PACKAGE_AUTHOR,
author_email=PACKAGE_EMAIL,
packages=setuptools.find_packages(),
- package_data={'apache_beam': [
- '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', '*/*.h', '*/*/*.h',
- 'testing/data/*.yaml', 'portability/api/*.yaml']},
+ package_data={
+ 'apache_beam': [
+ '*/*.pyx',
+ '*/*/*.pyx',
+ '*/*.pxd',
+ '*/*/*.pxd',
+ '*/*.h',
+ '*/*/*.h',
+ 'testing/data/*.yaml',
+ 'portability/api/*.yaml'
+ ]
+ },
ext_modules=cythonize([
# Make sure to use language_level=3 cython directive in files below.
'apache_beam/**/*.pyx',