You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/08/20 18:49:57 UTC

[beam] branch master updated: [BEAM-10917] Add support for BigQuery Read API in Python BEAM (#15185)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fd9cd1  [BEAM-10917] Add support for BigQuery Read API in Python BEAM (#15185)
2fd9cd1 is described below

commit 2fd9cd1a5077f5370612922da723aa82cd78a636
Author: vachan-shetty <52...@users.noreply.github.com>
AuthorDate: Fri Aug 20 14:48:59 2021 -0400

    [BEAM-10917] Add support for BigQuery Read API in Python BEAM (#15185)
    
    * Adding support for reading from BigQuery ReadAPI in Python BEAM.
    
    * Formatting fixes.
    
    * Fixing lint errors.
    
    * Adding singleton comparison.
    
    * Some more lint fixes.
    
    * Doc fixes.
    
    * Updating docstring about DATETIME handling in fastavro.
    
    * Actually making fastavro the default and some minor fixes.
    
    * Updating 'estimate_size()'. This should improve AutoScaling.
    
    * Renaming use_fastavro flag.
    
    * Updating Docs.
    
    * Fix for failing pre-commit tests and some other fixes.
---
 sdks/python/apache_beam/io/gcp/bigquery.py         | 344 ++++++++++++++++++++-
 .../apache_beam/io/gcp/bigquery_read_it_test.py    | 161 +++++++++-
 sdks/python/setup.py                               |   1 +
 3 files changed, 494 insertions(+), 12 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 38d341c..1f351ab 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -270,6 +270,7 @@ encoding when writing to BigQuery.
 # pytype: skip-file
 
 import collections
+import io
 import itertools
 import json
 import logging
@@ -277,13 +278,20 @@ import random
 import time
 import uuid
 from typing import Dict
+from typing import List
+from typing import Optional
 from typing import Union
 
+import avro.schema
+import fastavro
+from avro import io as avroio
+
 import apache_beam as beam
 from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam.internal.gcp.json_value import from_json_value
 from apache_beam.internal.gcp.json_value import to_json_value
+from apache_beam.io import range_trackers
 from apache_beam.io.avroio import _create_avro_source as create_avro_source
 from apache_beam.io.filesystems import CompressionTypes
 from apache_beam.io.filesystems import FileSystems
@@ -324,6 +332,7 @@ from apache_beam.utils.annotations import experimental
 try:
   from apache_beam.io.gcp.internal.clients.bigquery import DatasetReference
   from apache_beam.io.gcp.internal.clients.bigquery import TableReference
+  import google.cloud.bigquery_storage_v1 as bq_storage
 except ImportError:
   DatasetReference = None
   TableReference = None
@@ -794,7 +803,8 @@ class _CustomBigQuerySource(BoundedSource):
         bq.clean_up_temporary_dataset(self._get_project())
 
     for source in self.split_result:
-      yield SourceBundle(1.0, source, None, None)
+      yield SourceBundle(
+          weight=1.0, source=source, start_position=None, stop_position=None)
 
   def get_range_tracker(self, start_position, stop_position):
     class CustomBigQuerySourceRangeTracker(RangeTracker):
@@ -883,6 +893,276 @@ class _CustomBigQuerySource(BoundedSource):
     return table.schema, metadata_list
 
 
+class _CustomBigQueryStorageSourceBase(BoundedSource):
+  """A base class for BoundedSource implementations which read from BigQuery
+  using the BigQuery Storage API.
+
+  Args:
+    table (str, TableReference): The ID of the table. The ID must contain only
+      letters ``a-z``, ``A-Z``, numbers ``0-9``, or underscores ``_``  If
+      **dataset** argument is :data:`None` then the table argument must
+      contain the entire table reference specified as:
+      ``'PROJECT:DATASET.TABLE'`` or must specify a TableReference.
+    dataset (str): Optional ID of the dataset containing this table or
+      :data:`None` if the table argument specifies a TableReference.
+    project (str): Optional ID of the project containing this table or
+      :data:`None` if the table argument specifies a TableReference.
+    selected_fields (List[str]): Optional List of names of the fields in the
+      table that should be read. If empty, all fields will be read. If the
+      specified field is a nested field, all the sub-fields in the field will be
+      selected. The output field order is unrelated to the order of fields in
+      selected_fields.
+    row_restriction (str): Optional SQL text filtering statement, similar to a
+      WHERE clause in a query. Aggregates are not supported. Restricted to a
+      maximum length for 1 MB.
+  """
+
+  # The maximum number of streams which will be requested when creating a read
+  # session, regardless of the desired bundle size.
+  MAX_SPLIT_COUNT = 10000
+  # The minimum number of streams which will be requested when creating a read
+  # session, regardless of the desired bundle size. Note that the server may
+  # still choose to return fewer than ten streams based on the layout of the
+  # table.
+  MIN_SPLIT_COUNT = 10
+
+  def __init__(
+      self,
+      table: Union[str, TableReference],
+      dataset: Optional[str] = None,
+      project: Optional[str] = None,
+      selected_fields: Optional[List[str]] = None,
+      row_restriction: Optional[str] = None,
+      use_fastavro_for_direct_read: Optional[bool] = None,
+      pipeline_options: Optional[GoogleCloudOptions] = None):
+
+    self.table_reference = bigquery_tools.parse_table_reference(
+        table, dataset, project)
+    self.table = self.table_reference.tableId
+    self.dataset = self.table_reference.datasetId
+    self.project = self.table_reference.projectId
+    self.selected_fields = selected_fields
+    self.row_restriction = row_restriction
+    self.use_fastavro = \
+      True if use_fastavro_for_direct_read is None else \
+      use_fastavro_for_direct_read
+    self.pipeline_options = pipeline_options
+    self.split_result = None
+
+  def _get_parent_project(self):
+    """Returns the project that will be billed."""
+    project = self.pipeline_options.view_as(GoogleCloudOptions).project
+    if isinstance(project, vp.ValueProvider):
+      project = project.get()
+    if not project:
+      project = self.project
+    return project
+
+  def _get_table_size(self, table, dataset, project):
+    if project is None:
+      project = self._get_parent_project()
+
+    bq = bigquery_tools.BigQueryWrapper()
+    table = bq.get_table(project, dataset, table)
+    return table.numBytes
+
+  def display_data(self):
+    return {
+        'project': str(self.project),
+        'dataset': str(self.dataset),
+        'table': str(self.table),
+        'selected_fields': str(self.selected_fields),
+        'row_restriction': str(self.row_restriction),
+        'use_fastavro': str(self.use_fastavro)
+    }
+
+  def estimate_size(self):
+    # Returns the pre-filtering size of the table being read.
+    return self._get_table_size(self.table, self.dataset, self.project)
+
+  def split(self, desired_bundle_size, start_position=None, stop_position=None):
+    requested_session = bq_storage.types.ReadSession()
+    requested_session.table = 'projects/{}/datasets/{}/tables/{}'.format(
+        self.project, self.dataset, self.table)
+    requested_session.data_format = bq_storage.types.DataFormat.AVRO
+    if self.selected_fields is not None:
+      requested_session.read_options.selected_fields = self.selected_fields
+    if self.row_restriction is not None:
+      requested_session.read_options.row_restriction = self.row_restriction
+
+    storage_client = bq_storage.BigQueryReadClient()
+    stream_count = 0
+    if desired_bundle_size > 0:
+      table_size = self._get_table_size(self.table, self.dataset, self.project)
+      stream_count = min(
+          int(table_size / desired_bundle_size),
+          _CustomBigQueryStorageSourceBase.MAX_SPLIT_COUNT)
+    stream_count = max(
+        stream_count, _CustomBigQueryStorageSourceBase.MIN_SPLIT_COUNT)
+
+    parent = 'projects/{}'.format(self.project)
+    read_session = storage_client.create_read_session(
+        parent=parent,
+        read_session=requested_session,
+        max_stream_count=stream_count)
+    _LOGGER.info(
+        'Sent BigQuery Storage API CreateReadSession request: \n %s \n'
+        'Received response \n %s.',
+        requested_session,
+        read_session)
+
+    self.split_result = [
+        _CustomBigQueryStorageStreamSource(stream.name, self.use_fastavro)
+        for stream in read_session.streams
+    ]
+
+    for source in self.split_result:
+      yield SourceBundle(
+          weight=1.0, source=source, start_position=None, stop_position=None)
+
+  def get_range_tracker(self, start_position, stop_position):
+    class NonePositionRangeTracker(RangeTracker):
+      """A RangeTracker that always returns positions as None. Prevents the
+      BigQuery Storage source from being read() before being split()."""
+      def start_position(self):
+        return None
+
+      def stop_position(self):
+        return None
+
+    return NonePositionRangeTracker()
+
+  def read(self, range_tracker):
+    raise NotImplementedError(
+        'BigQuery storage source must be split before being read')
+
+
+class _CustomBigQueryStorageStreamSource(BoundedSource):
+  """A source representing a single stream in a read session."""
+  def __init__(self, read_stream_name: str, use_fastavro: bool):
+    self.read_stream_name = read_stream_name
+    self.use_fastavro = use_fastavro
+
+  def display_data(self):
+    return {
+        'read_stream': str(self.read_stream_name),
+    }
+
+  def estimate_size(self):
+    # The size of stream source cannot be estimate due to server-side liquid
+    # sharding.
+    # TODO: Implement progress reporting.
+    return None
+
+  def split(self, desired_bundle_size, start_position=None, stop_position=None):
+    # A stream source can't be split without reading from it due to
+    # server-side liquid sharding. A split will simply return the current source
+    # for now.
+    return SourceBundle(
+        weight=1.0,
+        source=_CustomBigQueryStorageStreamSource(
+            self.read_stream_name, self.use_fastavro),
+        start_position=None,
+        stop_position=None)
+
+  def get_range_tracker(self, start_position, stop_position):
+    # TODO: Implement dynamic work rebalancing.
+    assert start_position is None
+    # Defaulting to the start of the stream.
+    start_position = 0
+    # Since the streams are unsplittable we choose OFFSET_INFINITY as the
+    # default end offset so that all data of the source gets read.
+    stop_position = range_trackers.OffsetRangeTracker.OFFSET_INFINITY
+    range_tracker = range_trackers.OffsetRangeTracker(
+        start_position, stop_position)
+    # Ensuring that all try_split() calls will be ignored by the Rangetracker.
+    range_tracker = range_trackers.UnsplittableRangeTracker(range_tracker)
+
+    return range_tracker
+
+  def read(self, range_tracker):
+    _LOGGER.info(
+        "Started BigQuery Storage API read from stream %s.",
+        self.read_stream_name)
+    storage_client = bq_storage.BigQueryReadClient()
+    read_rows_iterator = iter(storage_client.read_rows(self.read_stream_name))
+    # Handling the case where the user might provide very selective filters
+    # which can result in read_rows_response being empty.
+    first_read_rows_response = next(read_rows_iterator, None)
+    if first_read_rows_response is None:
+      return iter([])
+
+    if self.use_fastavro:
+      row_reader = _ReadRowsResponseReaderWithFastAvro(
+          read_rows_iterator, first_read_rows_response)
+      return iter(row_reader)
+
+    row_reader = _ReadRowsResponseReader(
+        read_rows_iterator, first_read_rows_response)
+    return iter(row_reader)
+
+
+class _ReadRowsResponseReaderWithFastAvro():
+  """An iterator that deserializes ReadRowsResponses using the fastavro
+  library."""
+  def __init__(self, read_rows_iterator, read_rows_response):
+    self.read_rows_iterator = read_rows_iterator
+    self.read_rows_response = read_rows_response
+    self.avro_schema = fastavro.parse_schema(
+        json.loads(self.read_rows_response.avro_schema.schema))
+    self.bytes_reader = io.BytesIO(
+        self.read_rows_response.avro_rows.serialized_binary_rows)
+
+  def __iter__(self):
+    return self
+
+  def __next__(self):
+    try:
+      return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
+    except StopIteration:
+      self.read_rows_response = next(self.read_rows_iterator, None)
+      if self.read_rows_response is not None:
+        self.bytes_reader = io.BytesIO(
+            self.read_rows_response.avro_rows.serialized_binary_rows)
+        return fastavro.schemaless_reader(self.bytes_reader, self.avro_schema)
+      else:
+        raise StopIteration
+
+
+class _ReadRowsResponseReader():
+  """An iterator that deserializes ReadRowsResponses."""
+  def __init__(self, read_rows_iterator, read_rows_response):
+    self.read_rows_iterator = read_rows_iterator
+    self.read_rows_response = read_rows_response
+    self.avro_schema = avro.schema.Parse(
+        self.read_rows_response.avro_schema.schema)
+    self.reader = avroio.DatumReader(self.avro_schema)
+    self.decoder = avroio.BinaryDecoder(
+        io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
+    self.next_row = 0
+
+  def __iter__(self):
+    return self
+
+  def get_deserialized_row(self):
+    deserialized_row = self.reader.read(self.decoder)
+    self.next_row += 1
+    return deserialized_row
+
+  def __next__(self):
+    if self.next_row < self.read_rows_response.row_count:
+      return self.get_deserialized_row()
+
+    self.read_rows_response = next(self.read_rows_iterator, None)
+    if self.read_rows_response is not None:
+      self.decoder = avroio.BinaryDecoder(
+          io.BytesIO(self.read_rows_response.avro_rows.serialized_binary_rows))
+      self.next_row = 0
+      return self.get_deserialized_row()
+    else:
+      raise StopIteration
+
+
 @deprecated(since='2.11.0', current="WriteToBigQuery")
 class BigQuerySink(dataflow_io.NativeSink):
   """A sink based on a BigQuery table.
@@ -1837,19 +2117,38 @@ bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
 class ReadFromBigQuery(PTransform):
   """Read data from BigQuery.
 
-    This PTransform uses a BigQuery export job to take a snapshot of the table
-    on GCS, and then reads from each produced file. File format is Avro by
+    This PTransform uses either a BigQuery export job to take a snapshot of the
+    table on GCS, and then reads from each produced file (EXPORT) or reads
+    directly from BigQuery storage using BigQuery Read API (DIRECT_READ). The
+    option is specified using the 'method' :parameter. File format is Avro by
     default.
 
+    NOTE: DIRECT_READ only supports reading from BigQuery Tables currently. To
+    read the results of a query please use EXPORT.
+
+  .. warning::
+      DATETIME columns are parsed as strings in the fastavro library. As a
+      result, such columns will be converted to Python strings instead of native
+      Python DATETIME types.
+
   Args:
+    method: The method to use to read from BigQuery. It may be EXPORT or
+      DIRECT_READ. EXPORT invokes a BigQuery export request
+      (https://cloud.google.com/bigquery/docs/exporting-data). DIRECT_READ reads
+      directly from BigQuery storage using the BigQuery Read API
+      (https://cloud.google.com/bigquery/docs/reference/storage). If
+      unspecified, the default is currently EXPORT.
+    use_fastavro_for_direct_read (bool): If method is `DIRECT_READ` and
+       :data:`True`, the fastavro library is used to deserialize the data
+       received from the BigQuery Read API. The default here is :data:`True`.
     table (str, callable, ValueProvider): The ID of the table, or a callable
       that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
       numbers ``0-9``, or underscores ``_``. If dataset argument is
       :data:`None` then the table argument must contain the entire table
-      reference specified as: ``'DATASET.TABLE'``
-      or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
-      argument representing an element to be written to BigQuery, and return
-      a TableReference, or a string table name as specified above.
+      reference specified as: ``'PROJECT:DATASET.TABLE'``.
+      If it's a callable, it must receive one argument representing an element
+      to be written to BigQuery, and return a TableReference, or a string table
+      name as specified above.
     dataset (str): The ID of the dataset containing this table or
       :data:`None` if the table reference is specified entirely by the table
       argument.
@@ -1900,16 +2199,21 @@ class ReadFromBigQuery(PTransform):
       https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro\
               #avro_conversions
     temp_dataset (``apache_beam.io.gcp.internal.clients.bigquery.\
-DatasetReference``):
+        DatasetReference``):
         The dataset in which to create temporary tables when performing file
         loads. By default, a new dataset is created in the execution project for
         temporary tables.
    """
+  class Method(object):
+    EXPORT = 'EXPORT'  #  This is currently the default.
+    DIRECT_READ = 'DIRECT_READ'
 
   COUNTER = 0
 
-  def __init__(self, gcs_location=None, *args, **kwargs):
-    if gcs_location:
+  def __init__(self, gcs_location=None, method=None, *args, **kwargs):
+    self.method = method or ReadFromBigQuery.Method.EXPORT
+
+    if gcs_location and self.method is ReadFromBigQuery.Method.EXPORT:
       if not isinstance(gcs_location, (str, ValueProvider)):
         raise TypeError(
             '%s: gcs_location must be of type string'
@@ -1920,12 +2224,21 @@ DatasetReference``):
         gcs_location = StaticValueProvider(str, gcs_location)
 
     self.gcs_location = gcs_location
-
     self._args = args
     self._kwargs = kwargs
 
   def expand(self, pcoll):
     # TODO(BEAM-11115): Make ReadFromBQ rely on ReadAllFromBQ implementation.
+    if self.method is ReadFromBigQuery.Method.EXPORT:
+      return self._expand_export(pcoll)
+    elif self.method is ReadFromBigQuery.Method.DIRECT_READ:
+      return self._expand_direct_read(pcoll)
+    else:
+      raise ValueError(
+          'The method to read from BigQuery must be either EXPORT'
+          'or DIRECT_READ.')
+
+  def _expand_export(self, pcoll):
     temp_location = pcoll.pipeline.options.view_as(
         GoogleCloudOptions).temp_location
     job_name = pcoll.pipeline.options.view_as(GoogleCloudOptions).job_name
@@ -1960,6 +2273,15 @@ DatasetReference``):
                 **self._kwargs))
         | _PassThroughThenCleanup(files_to_remove_pcoll))
 
+  def _expand_direct_read(self, pcoll):
+    return (
+        pcoll
+        | beam.io.Read(
+            _CustomBigQueryStorageSourceBase(
+                pipeline_options=pcoll.pipeline.options,
+                *self._args,
+                **self._kwargs)))
+
 
 class ReadFromBigQueryRequest:
   """
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
index 472b521..53bf567 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_read_it_test.py
@@ -26,12 +26,14 @@ import logging
 import random
 import time
 import unittest
+import uuid
 from decimal import Decimal
 from functools import wraps
 
 import pytest
 
 import apache_beam as beam
+from apache_beam.io.gcp import bigquery_tools
 from apache_beam.io.gcp.bigquery_tools import BigQueryWrapper
 from apache_beam.io.gcp.internal.clients import bigquery
 from apache_beam.options.value_provider import StaticValueProvider
@@ -128,7 +130,7 @@ class ReadTests(BigQueryReadIntegrationTests):
   @classmethod
   def setUpClass(cls):
     super(ReadTests, cls).setUpClass()
-    cls.table_name = 'python_write_table'
+    cls.table_name = 'python_read_table'
     cls.create_table(cls.table_name)
 
     table_id = '{}.{}'.format(cls.dataset_id, cls.table_name)
@@ -175,6 +177,163 @@ class ReadTests(BigQueryReadIntegrationTests):
       assert_that(result, equal_to(self.TABLE_DATA))
 
 
+class ReadUsingStorageApiTests(BigQueryReadIntegrationTests):
+  TABLE_DATA = [{
+      'number': 1, 'str': 'abc'
+  }, {
+      'number': 2, 'str': 'def'
+  }, {
+      'number': 3, 'str': u'你好'
+  }, {
+      'number': 4, 'str': u'привет'
+  }]
+
+  @classmethod
+  def setUpClass(cls):
+    super(ReadUsingStorageApiTests, cls).setUpClass()
+    cls.table_name = 'python_read_table'
+    cls._create_table(cls.table_name)
+
+    table_id = '{}.{}'.format(cls.dataset_id, cls.table_name)
+    cls.query = 'SELECT number, str FROM `%s`' % table_id
+
+    # Materializing the newly created Table to ensure the Read API can stream.
+    cls.temp_table_reference = cls._execute_query(cls.project, cls.query)
+
+  @classmethod
+  def tearDownClass(cls):
+    cls.bigquery_client.clean_up_temporary_dataset(cls.project)
+    super(ReadUsingStorageApiTests, cls).tearDownClass()
+
+  @classmethod
+  def _create_table(cls, table_name):
+    table_schema = bigquery.TableSchema()
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'number'
+    table_field.type = 'INTEGER'
+    table_schema.fields.append(table_field)
+    table_field = bigquery.TableFieldSchema()
+    table_field.name = 'str'
+    table_field.type = 'STRING'
+    table_schema.fields.append(table_field)
+    table = bigquery.Table(
+        tableReference=bigquery.TableReference(
+            projectId=cls.project, datasetId=cls.dataset_id,
+            tableId=table_name),
+        schema=table_schema)
+    request = bigquery.BigqueryTablesInsertRequest(
+        projectId=cls.project, datasetId=cls.dataset_id, table=table)
+    cls.bigquery_client.client.tables.Insert(request)
+    cls.bigquery_client.insert_rows(
+        cls.project, cls.dataset_id, table_name, cls.TABLE_DATA)
+
+  @classmethod
+  def _setup_temporary_dataset(cls, project, query):
+    location = cls.bigquery_client.get_query_location(project, query, False)
+    cls.bigquery_client.create_temporary_dataset(project, location)
+
+  @classmethod
+  def _execute_query(cls, project, query):
+    query_job_name = bigquery_tools.generate_bq_job_name(
+        'materializing_table_before_reading',
+        str(uuid.uuid4())[0:10],
+        bigquery_tools.BigQueryJobTypes.QUERY,
+        '%s_%s' % (int(time.time()), random.randint(0, 1000)))
+    cls._setup_temporary_dataset(cls.project, cls.query)
+    job = cls.bigquery_client._start_query_job(
+        project,
+        query,
+        use_legacy_sql=False,
+        flatten_results=False,
+        job_id=query_job_name)
+    job_ref = job.jobReference
+    cls.bigquery_client.wait_for_bq_job(job_ref, max_retries=0)
+    return cls.bigquery_client._get_temp_table(project)
+
+  def test_iobase_source(self):
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              use_fastavro_for_direct_read=True))
+      assert_that(result, equal_to(self.TABLE_DATA))
+
+  def test_iobase_source_without_fastavro(self):
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              use_fastavro_for_direct_read=False))
+      assert_that(result, equal_to(self.TABLE_DATA))
+
+  def test_iobase_source_with_column_selection(self):
+    EXPECTED_TABLE_DATA = [{
+        'number': 1
+    }, {
+        'number': 2
+    }, {
+        'number': 3
+    }, {
+        'number': 4
+    }]
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              selected_fields=['number']))
+      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
+
+  def test_iobase_source_with_row_restriction(self):
+    EXPECTED_TABLE_DATA = [{
+        'number': 3, 'str': u'你好'
+    }, {
+        'number': 4, 'str': u'привет'
+    }]
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              row_restriction='number > 2'))
+      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
+
+  def test_iobase_source_with_column_selection_and_row_restriction(self):
+    EXPECTED_TABLE_DATA = [{'str': u'你好'}, {'str': u'привет'}]
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              selected_fields=['str'],
+              row_restriction='number > 2'))
+      assert_that(result, equal_to(EXPECTED_TABLE_DATA))
+
+  def test_iobase_source_with_very_selective_filters(self):
+    with beam.Pipeline(argv=self.args) as p:
+      result = (
+          p | 'Read with BigQuery Storage API' >> beam.io.ReadFromBigQuery(
+              method=beam.io.ReadFromBigQuery.Method.DIRECT_READ,
+              project=self.temp_table_reference.projectId,
+              dataset=self.temp_table_reference.datasetId,
+              table=self.temp_table_reference.tableId,
+              selected_fields=['str'],
+              row_restriction='number > 4'))
+      assert_that(result, equal_to([]))
+
+
 class ReadNewTypesTests(BigQueryReadIntegrationTests):
   @classmethod
   def setUpClass(cls):
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 5d72a9d..338251d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -186,6 +186,7 @@ GCP_REQUIREMENTS = [
     'google-auth>=1.18.0,<2',
     'google-cloud-datastore>=1.8.0,<2',
     'google-cloud-pubsub>=0.39.0,<2',
+    'google-cloud-bigquery-storage>=2.4.0',
     # GCP packages required by tests
     'google-cloud-bigquery>=1.6.0,<3',
     'google-cloud-core>=0.28.1,<2',