You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/08/27 13:52:22 UTC

[beam] branch master updated: [BEAM-12810] Reverting PR-15185 (#15402)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 92b9977  [BEAM-12810] Reverting PR-15185 (#15402)
92b9977 is described below

commit 92b9977d71dfbf3ce7d3f42fe0d6cdb8e384a0d0
Author: Vachan <52...@users.noreply.github.com>
AuthorDate: Fri Aug 27 09:51:36 2021 -0400

    [BEAM-12810] Reverting PR-15185 (#15402)
    
    * Reverting PR-15185
    
    * Lint fixes.
    
    * Formatting fixes.
    
    * More formatting fixes.
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 340 +--------------------
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 159 ----------
 sdks/python/setup.py                               |  31 +-
 3 files changed, 26 insertions(+), 504 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 4e5f639..88093b1 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -270,7 +270,6 @@ encoding when writing to BigQuery.
 # pytype: skip-file
 
 import collections
-import io
 import itertools
 import json
 import logging
@@ -278,20 +277,13 @@ import random
 import time
 import uuid
 from typing import Dict
-from typing import List
-from typing import Optional
 from typing import Union
 
-import avro.schema
-import fastavro
-from avro import io as avroio
-
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
-from apache_beam.io import range_trackers
 from apache_beam.io.avroio import _create_avro_source as create_avro_source
 from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
@@ -332,7 +324,6 @@ from apache_beam.utils.annotations import experimental
 try:
   from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
   from apache_beam.io.gcp.internal.clients.bigquery import TableReference
-  import google.cloud.bigquery_storage_v1 as bq_storage
 except ImportError:
   DatasetReference = None
   TableReference = None
@@ -896,276 +887,6 @@ class _CustomBigQuerySource(BoundedSource):
     return table.schema, metadata_list
 
 
-class _CustomBigQueryStorageSourceBase(BoundedSource):
-  """A base class for BoundedSource implementations which read from BigQuery
-  using the BigQuery Storage API.
-
-  Args:
-    table (str, TableReference): The ID of the table. The ID must contain only
-      letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``  If
-      **dataset** argument is :data:`None` then the table argument must
-      contain the entire table reference specified as:
-      ``'PROJECT:DATASET.TABLE'`` or must specify a TableReference.
-    dataset (str): Optional ID of the dataset containing this table or
-      :data:`None` if the table argument specifies a TableReference.
-    project (str): Optional ID of the project containing this table or
-      :data:`None` if the table argument specifies a TableReference.
-    selected_fields (List[str]): Optional List of names of the fields in the
-      table that should be read. If empty, all fields will be read. If the
-      specified field is a nested field, all the sub-fields in the field will be
-      selected. The output field order is unrelated to the order of fields in
-      selected_fields.
-    row_restriction (str): Optional SQL text filtering statement, similar to a
-      WHERE clause in a query. Aggregates are not supported. Restricted to a
-      maximum length for 1 MB.
-  """
-
-  # The maximum number of streams which will be requested when creating a read
-  # session, regardless of the desired bundle size.
-  MAX_SPLIT_COUNT = 10000
-  # The minimum number of streams which will be requested when creating a read
-  # session, regardless of the desired bundle size. Note that the server may
-  # still choose to return fewer than ten streams based on the layout of the
-  # table.
-  MIN_SPLIT_COUNT = 10
-
-  def __init__(
-      self,
-      table: Union[str, TableReference],
-      dataset: Optional[str] = None,
-      project: Optional[str] = None,
-      selected_fields: Optional[List[str]] = None,
-      row_restriction: Optional[str] = None,
-      use_fastavro_for_direct_read: Optional[bool] = None,
-      pipeline_options: Optional[GoogleCloudOptions] = None):
-
-    self.table_reference = bigquery_tools.parse_table_reference(
-        table, dataset, project)
-    self.table = self.table_reference.tableId
-    self.dataset = self.table_reference.datasetId
-    self.project = self.table_reference.projectId
-    self.selected_fields = selected_fields
-    self.row_restriction = row_restriction
-    self.use_fastavro = \
-      True if use_fastavro_for_direct_read is None else \
-      use_fastavro_for_direct_read
-    self.pipeline_options = pipeline_options
-    self.split_result = None
-
-  def _get_parent_project(self):
-    """Returns the project that will be billed."""
-    project = self.pipeline_options.view_as(GoogleCloudOptions).project
-    if isinstance(project, vp.ValueProvider):
-      project = project.get()
-    if not project:
-      project = self.project
-    return project
-
-  def _get_table_size(self, table, dataset, project):
-    if project is None:
-      project = self._get_parent_project()
-
-    bq = bigquery_tools.BigQueryWrapper()
-    table = bq.get_table(project, dataset, table)
-    return table.numBytes
-
-  def display_data(self):
-    return {
-        'project': str(self.project),
-        'dataset': str(self.dataset),
-        'table': str(self.table),
-        'selected_fields': str(self.selected_fields),
-        'row_restriction': str(self.row_restriction),
-        'use_fastavro': str(self.use_fastavro)
-    }
-
-  def estimate_size(self):
-    # Returns the pre-filtering size of the table being read.
-    return self._get_table_size(self.table, self.dataset, self.project)
-
-  def split(self, desired_bundle_size, start_position=None, stop_position=None):
-    requested_session = bq_storage.types.ReadSession()
-    requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
-        self.project, self.dataset, self.table)
-    requested_session.data_format = bq_storage.types.DataFormat.AVRO
-    if self.selected_fields is not None:
-      requested_session.read_options.selected_fields = self.selected_fields
-    if self.row_restriction is not None:
-      requested_session.read_options.row_restriction = self.row_restriction
-
-    storage_client = bq_storage.BigQueryReadClient()
-    stream_count = 0
-    if desired_bundle_size > 0:
-      table_size = self._get_table_size(self.table, self.dataset, self.project)
-      stream_count = min(
-          int(table_size / desired_bundle_size),
-          _CustomBigQueryStorageSourceBase.MAX_SPLIT_COUNT)
-    stream_count = max(
-        stream_count, _CustomBigQueryStorageSourceBase.MIN_SPLIT_COUNT)
-
-    parent = 'projects/{}'.format(self.project)
-    read_session = storage_client.create_read_session(
-        parent=parent,
-        read_session=requested_session,
-        max_stream_count=stream_count)
-    _LOGGER.info(
-        'Sent BigQuery Storage API CreateReadSession request: \n %s \n'
-        'Received response \n %s.',
-        requested_session,
-        read_session)
-
-    self.split_result = [
-        _CustomBigQueryStorageStreamSource(stream.name, self.use_fastavro)
-        for stream in read_session.streams
-    ]
-
-    for source in self.split_result:
-      yield SourceBundle(
-          weight=1.0, source=source, start_position=None, stop_position=None)
-
-  def get_range_tracker(self, start_position, stop_position):
-    class NonePositionRangeTracker(RangeTracker):
-      """A RangeTracker that always returns positions as None. Prevents the
-      BigQuery Storage source from being read() before being split()."""
-      def start_position(self):
-        return None
-
-      def stop_position(self):
-        return None
-
-    return NonePositionRangeTracker()
-
-  def read(self, range_tracker):
-    raise NotImplementedError(
-        'BigQuery storage source must be split before being read')
-
-
-class _CustomBigQueryStorageStreamSource(BoundedSource):
-  """A source representing a single stream in a read session."""
-  def __init__(self, read_stream_name: str, use_fastavro: bool):
-    self.read_stream_name = read_stream_name
-    self.use_fastavro = use_fastavro
-
-  def display_data(self):
-    return {
-        'read_stream': str(self.read_stream_name),
-    }
-
-  def estimate_size(self):
-    # The size of stream source cannot be estimate due to server-side liquid
-    # sharding.
-    # TODO: Implement progress reporting.
-    return None
-
-  def split(self, desired_bundle_size, start_position=None, stop_position=None):
-    # A stream source can't be split without reading from it due to
-    # server-side liquid sharding. A split will simply return the current source
-    # for now.
-    return SourceBundle(
-        weight=1.0,
-        source=_CustomBigQueryStorageStreamSource(
-            self.read_stream_name, self.use_fastavro),
-        start_position=None,
-        stop_position=None)
-
-  def get_range_tracker(self, start_position, stop_position):
-    # TODO: Implement dynamic work rebalancing.
-    assert start_position is None
-    # Defaulting to the start of the stream.
-    start_position = 0
-    # Since the streams are unsplittable we choose OFFSET_INFINITY as the
-    # default end offset so that all data of the source gets read.
-    stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
-    range_tracker = range_trackers.OffsetRangeTracker(
-        start_position, stop_position)
-    # Ensuring that all try_split() calls will be ignored by the Rangetracker.
-    range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
-
-    return range_tracker
-
-  def read(self, range_tracker):
-    _LOGGER.info(
-        "Started BigQuery Storage API read from stream %s.",
-        self.read_stream_name)
-    storage_client = bq_storage.BigQueryReadClient()
-    read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name))
-    # Handling the case where the user might provide very selective filters
-    # which can result in read_rows_response being empty.
-    first_read_rows_response = next(read_rows_iterator, None)
-    if first_read_rows_response is None:
-      return iter([])
-
-    if self.use_fastavro:
-      row_reader = _ReadRowsResponseReaderWithFastAvro(
-          read_rows_iterator, first_read_rows_response)
-      return iter(row_reader)
-
-    row_reader = _ReadRowsResponseReader(
-        read_rows_iterator, first_read_rows_response)
-    return iter(row_reader)
-
-
-class _ReadRowsResponseReaderWithFastAvro():
-  """An iterator that deserializes ReadRowsResponses using the fastavro
-  library."""
-  def __init__(self, read_rows_iterator, read_rows_response):
-    self.read_rows_iterator = read_rows_iterator
-    self.read_rows_response = read_rows_response
-    self.avro_schema = fastavro.parse_schema(
-        json.loads(self.read_rows_response.avro_schema.schema))
-    self.bytes_reader = io.BytesIO(
-        self.read_rows_response.avro_rows.serialized_binary_rows)
-
-  def __iter__(self):
-    return self
-
-  def __next__(self):
-    try:
-      return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
-    except StopIteration:
-      self.read_rows_response = next(self.read_rows_iterator, None)
-      if self.read_rows_response is not None:
-        self.bytes_reader = io.BytesIO(
-            self.read_rows_response.avro_rows.serialized_binary_rows)
-        return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
-      else:
-        raise StopIteration
-
-
-class _ReadRowsResponseReader():
-  """An iterator that deserializes ReadRowsResponses."""
-  def __init__(self, read_rows_iterator, read_rows_response):
-    self.read_rows_iterator = read_rows_iterator
-    self.read_rows_response = read_rows_response
-    self.avro_schema = avro.schema.Parse(
-        self.read_rows_response.avro_schema.schema)
-    self.reader = avroio.DatumReader(self.avro_schema)
-    self.decoder = avroio.BinaryDecoder(
-        io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
-    self.next_row = 0
-
-  def __iter__(self):
-    return self
-
-  def get_deserialized_row(self):
-    deserialized_row = self.reader.read(self.decoder)
-    self.next_row += 1
-    return deserialized_row
-
-  def __next__(self):
-    if self.next_row < self.read_rows_response.row_count:
-      return self.get_deserialized_row()
-
-    self.read_rows_response = next(self.read_rows_iterator, None)
-    if self.read_rows_response is not None:
-      self.decoder = avroio.BinaryDecoder(
-          io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
-      self.next_row = 0
-      return self.get_deserialized_row()
-    else:
-      raise StopIteration
-
-
 @deprecated(since='2.11.0', current="WriteToBigQuery")
 class BigQuerySink(dataflow_io.NativeSink):
   """A sink based on a BigQuery table.
@@ -2120,38 +1841,19 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
 class ReadFromBigQuery(PTransform):
   """Read data from BigQuery.
 
-    This PTransform uses either a BigQuery export job to take a snapshot of the
-    table on GCS, and then reads from each produced file (EXPORT) or reads
-    directly from BigQuery storage using BigQuery Read API (DIRECT_READ). The
-    option is specified using the 'method' :parameter. File format is Avro by
+    This PTransform uses a BigQuery export job to take a snapshot of the table
+    on GCS, and then reads from each produced file. File format is Avro by
     default.
 
-    NOTE: DIRECT_READ only supports reading from BigQuery Tables currently. To
-    read the results of a query please use EXPORT.
-
-  .. warning::
-      DATETIME columns are parsed as strings in the fastavro library. As a
-      result, such columns will be converted to Python strings instead of native
-      Python DATETIME types.
-
   Args:
-    method: The method to use to read from BigQuery. It may be EXPORT or
-      DIRECT_READ. EXPORT invokes a BigQuery export request
-      (https://cloud.google.com/bigquery/docs/exporting-data). DIRECT_READ reads
-      directly from BigQuery storage using the BigQuery Read API
-      (https://cloud.google.com/bigquery/docs/reference/storage). If
-      unspecified, the default is currently EXPORT.
-    use_fastavro_for_direct_read (bool): If method is `DIRECT_READ` and
-       :data:`True`, the fastavro library is used to deserialize the data
-       received from the BigQuery Read API. The default here is :data:`True`.
     table (str, callable, ValueProvider): The ID of the table, or a callable
       that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
       numbers ``0-9``, or underscores ``_``. If dataset argument is
       :data:`None` then the table argument must contain the entire table
-      reference specified as: ``'PROJECT:DATASET.TABLE'``.
-      If it's a callable, it must receive one argument representing an element
-      to be written to BigQuery, and return a TableReference, or a string table
-      name as specified above.
+      reference specified as: ``'DATASET.TABLE'``
+      or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
+      argument representing an element to be written to BigQuery, and return
+      a TableReference, or a string table name as specified above.
     dataset (str): The ID of the dataset containing this table or
       :data:`None` if the table reference is specified entirely by the table
       argument.
@@ -2212,22 +1914,16 @@ class ReadFromBigQuery(PTransform):
         to create and delete tables within the given dataset. Dataset name
         should *not* start with the reserved prefix `beam_temp_dataset_`.
    """
-  class Method(object):
-    EXPORT = 'EXPORT'  #  This is currently the default.
-    DIRECT_READ = 'DIRECT_READ'
 
   COUNTER = 0
 
-  def __init__(self, gcs_location=None, method=None, *args, **kwargs):
-    self.method = method or ReadFromBigQuery.Method.EXPORT
-
-    if gcs_location and self.method is ReadFromBigQuery.Method.EXPORT:
+  def __init__(self, gcs_location=None, *args, **kwargs):
+    if gcs_location:
       if not isinstance(gcs_location, (str, ValueProvider)):
         raise TypeError(
             '%s: gcs_location must be of type string'
             ' or ValueProvider; got %r instead' %
             (self.__class__.__name__, type(gcs_location)))
-
       if isinstance(gcs_location, str):
         gcs_location = StaticValueProvider(str, gcs_location)
 
@@ -2236,17 +1932,6 @@ class ReadFromBigQuery(PTransform):
     self._kwargs = kwargs
 
   def expand(self, pcoll):
-    # TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation.
-    if self.method is ReadFromBigQuery.Method.EXPORT:
-      return self._expand_export(pcoll)
-    elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
-      return self._expand_direct_read(pcoll)
-    else:
-      raise ValueError(
-          'The method to read from BigQuery must be either EXPORT'
-          'or DIRECT_READ.')
-
-  def _expand_export(self, pcoll):
     temp_location = pcoll.pipeline.options.view_as(
         GoogleCloudOptions).temp_location
     job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
@@ -2281,15 +1966,6 @@ class ReadFromBigQuery(PTransform):
                 **self._kwargs))
         | _PassThroughThenCleanup(files_to_remove_pcoll))
 
-  def _expand_direct_read(self, pcoll):
-    return (
-        pcoll
-        | beam.io.Read(
-            _CustomBigQueryStorageSourceBase(
-                pipeline_options=pcoll.pipeline.options,
-                *self._args,
-                **self._kwargs)))
-
 
 class ReadFromBigQueryRequest:
   """
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 53bf567..9216a9c 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -26,14 +26,12 @@ import logging
 import random
 import time
 import unittest
-import uuid
 from decimal import Decimal
 from functools import wraps
 
 import pytest
 
 import apache_beam as beam
-from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.value_provider import StaticValueProvider
@@ -177,163 +175,6 @@ class ReadTests(BigQueryReadIntegrationTests):
       assert_that(result, equal_to(self.TABLE_DATA))
 
 
-class ReadUsingStorageApiTests(BigQueryReadIntegrationTests):
-  TABLE_DATA = [{
-      'number': 1, 'str': 'abc'
-  }, {
-      'number': 2, 'str': 'def'
-  }, {
-      'number': 3, 'str': u'你好'
-  }, {
-      'number': 4, 'str': u'привет'
-  }]
-
-  @classmethod
-  def setUpClass(cls):
-    super(ReadUsingStorageApiTests, cls).setUpClass()
-    cls.table_name = 'python_read_table'
-    cls._create_table(cls.table_name)
-
-    table_id = '{}.{}'.format(cls.dataset_id, cls.table_name)
-    cls.query = 'SELECT number, str FROM `%s`' % table_id
-
-    # Materializing the newly created Table to ensure the Read API can stream.
-    cls.temp_table_reference = cls._execute_query(cls.project, cls.query)
-
-  @classmethod
-  def tearDownClass(cls):
-    cls.bigquery_client.clean_up_temporary_dataset(cls.project)
-    super(ReadUsingStorageApiTests, cls).tearDownClass()
-
-  @classmethod
-  def _create_table(cls, table_name):
-    table_schema = bigquery.TableSchema()
-    table_field = bigquery.TableFieldSchema()
-    table_field.name = 'number'
-    table_field.type = 'INTEGER'
-    table_schema.fields.append(table_field)
-    table_field = bigquery.TableFieldSchema()
-    table_field.name = 'str'
-    table_field.type = 'STRING'
-    table_schema.fields.append(table_field)
-    table = bigquery.Table(
-        tableReference=bigquery.TableReference(
-            projectId=cls.project, datasetId=cls.dataset_id,
-            tableId=table_name),
-        schema=table_schema)
-    request = bigquery.BigqueryTablesInsertRequest(
-        projectId=cls.project, datasetId=cls.dataset_id, table=table)
-    cls.bigquery_client.client.tables.Insert(request)
-    cls.bigquery_client.insert_rows(
-        cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
-
-  @classmethod
-  def _setup_temporary_dataset(cls, project, query):
-    location = cls.bigquery_client.get_query_location(project, query, False)
-    cls.bigquery_client.create_temporary_dataset(project, location)
-
-  @classmethod
-  def _execute_query(cls, project, query):
-    query_job_name = bigquery_tools.generate_bq_job_name(
-        'materializing_table_before_reading',
-        str(uuid.uuid4())[0:10],
-        bigquery_tools.BigQueryJobTypes.QUERY,
-        '%s_%s' % (int(time.time()), random.randint(0, 1000)))
-    cls._setup_temporary_dataset(cls.project, cls.query)
-    job = cls.bigquery_client._start_query_job(
-        project,
-        query,
-        use_legacy_sql=False,
-        flatten_results=False,
-        job_id=query_job_name)
-    job_ref = job.jobReference
-    cls.bigquery_client.wait_for_bq_job(job_ref, max_retries=0)
-    return cls.bigquery_client._get_temp_table(project)
-
-  def test_iobase_source(self):
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              use_fastavro_for_direct_read=True))
-      assert_that(result, equal_to(self.TABLE_DATA))
-
-  def test_iobase_source_without_fastavro(self):
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              use_fastavro_for_direct_read=False))
-      assert_that(result, equal_to(self.TABLE_DATA))
-
-  def test_iobase_source_with_column_selection(self):
-    EXPECTED_TABLE_DATA = [{
-        'number': 1
-    }, {
-        'number': 2
-    }, {
-        'number': 3
-    }, {
-        'number': 4
-    }]
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              selected_fields=['number']))
-      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
-  def test_iobase_source_with_row_restriction(self):
-    EXPECTED_TABLE_DATA = [{
-        'number': 3, 'str': u'你好'
-    }, {
-        'number': 4, 'str': u'привет'
-    }]
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              row_restriction='number > 2'))
-      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
-  def test_iobase_source_with_column_selection_and_row_restriction(self):
-    EXPECTED_TABLE_DATA = [{'str': u'你好'}, {'str': u'привет'}]
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              selected_fields=['str'],
-              row_restriction='number > 2'))
-      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
-
-  def test_iobase_source_with_very_selective_filters(self):
-    with beam.Pipeline(argv=self.args) as p:
-      result = (
-          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
-              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
-              project=self.temp_table_reference.projectId,
-              dataset=self.temp_table_reference.datasetId,
-              table=self.temp_table_reference.tableId,
-              selected_fields=['str'],
-              row_restriction='number > 4'))
-      assert_that(result, equal_to([]))
-
-
 class ReadNewTypesTests(BigQueryReadIntegrationTests):
   @classmethod
   def setUpClass(cls):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index f4e02b8..170fbff 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -103,7 +103,6 @@ if StrictVersion(_PIP_VERSION) < StrictVersion(REQUIRED_PIP_VERSION):
       )
   )
 
-
 REQUIRED_CYTHON_VERSION = '0.28.1'
 try:
   _CYTHON_VERSION = get_distribution('cython').version
@@ -155,7 +154,7 @@ REQUIRED_PACKAGES = [
     'pytz>=2018.3',
     'requests>=2.24.0,<3.0.0',
     'typing-extensions>=3.7.0,<4',
-    ]
+]
 
 # [BEAM-8181] pyarrow cannot be installed on 32-bit Windows platforms.
 if sys.platform == 'win32' and sys.maxsize <= 2**32:
@@ -178,7 +177,7 @@ REQUIRED_TEST_PACKAGES = [
     'sqlalchemy>=1.3,<2.0',
     'psycopg2-binary>=2.8.5,<3.0.0',
     'testcontainers>=3.0.3,<4.0.0',
-    ]
+]
 
 GCP_REQUIREMENTS = [
     'cachetools>=3.1.0,<5',
@@ -186,7 +185,6 @@ GCP_REQUIREMENTS = [
     'google-auth>=1.18.0,<2',
     'google-cloud-datastore>=1.8.0,<2',
     'google-cloud-pubsub>=0.39.0,<2',
-    'google-cloud-bigquery-storage>=2.4.0',
     # GCP packages required by tests
     'google-cloud-bigquery>=1.6.0,<3',
     'google-cloud-core>=0.28.1,<2',
@@ -223,9 +221,7 @@ INTERACTIVE_BEAM_TEST = [
     'pillow>=7.1.1,<8',
 ]
 
-AWS_REQUIREMENTS = [
-    'boto3 >=1.9'
-]
+AWS_REQUIREMENTS = ['boto3 >=1.9']
 
 AZURE_REQUIREMENTS = [
     'azure-storage-blob >=12.3.2',
@@ -233,7 +229,6 @@ AZURE_REQUIREMENTS = [
 ]
 
 
-
 # We must generate protos after setup_requires are installed.
 def generate_protos_first(original_cmd):
   try:
@@ -245,6 +240,7 @@ def generate_protos_first(original_cmd):
       def run(self):
         gen_protos.generate_proto_files()
         super(cmd, self).run()
+
     return cmd
   except ImportError:
     warnings.warn("Could not import gen_protos, skipping proto generation.")
@@ -256,8 +252,8 @@ python_requires = '>=3.6'
 if sys.version_info.major == 3 and sys.version_info.minor >= 9:
   warnings.warn(
       'This version of Apache Beam has not been sufficiently tested on '
-      'Python %s.%s. You may encounter bugs or missing features.' % (
-          sys.version_info.major, sys.version_info.minor))
+      'Python %s.%s. You may encounter bugs or missing features.' %
+      (sys.version_info.major, sys.version_info.minor))
 
 setuptools.setup(
     name=PACKAGE_NAME,
@@ -269,9 +265,18 @@ setuptools.setup(
     author=PACKAGE_AUTHOR,
     author_email=PACKAGE_EMAIL,
     packages=setuptools.find_packages(),
-    package_data={'apache_beam': [
-        '*/*.pyx', '*/*/*.pyx', '*/*.pxd', '*/*/*.pxd', '*/*.h', '*/*/*.h',
-        'testing/data/*.yaml', 'portability/api/*.yaml']},
+    package_data={
+        'apache_beam': [
+            '*/*.pyx',
+            '*/*/*.pyx',
+            '*/*.pxd',
+            '*/*/*.pxd',
+            '*/*.h',
+            '*/*/*.h',
+            'testing/data/*.yaml',
+            'portability/api/*.yaml'
+        ]
+    },
     ext_modules=cythonize([
         # Make sure to use language_level=3 cython directive in files below.
         'apache_beam/**/*.pyx',