You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/11/12 03:10:01 UTC

[jira] [Work logged] (BEAM-7246) Create a Spanner IO for Python

     [ https://issues.apache.org/jira/browse/BEAM-7246?focusedWorklogId=341629&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-341629 ]

ASF GitHub Bot logged work on BEAM-7246:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Nov/19 03:09
            Start Date: 12/Nov/19 03:09
    Worklog Time Spent: 10m 
      Work Description: udim commented on pull request #9606: [BEAM-7246] Add Google Spanner IO Read on Python SDK
URL: https://github.com/apache/beam/pull/9606#discussion_r343277819
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/spannerio.py
 ##########
 @@ -0,0 +1,531 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Google Cloud Spanner IO
+
+This is an experimental module for reading and writing data from Google Cloud
+Spanner. Visit: https://cloud.google.com/spanner for more details.
+
+To read from Cloud Spanner apply ReadFromSpanner transformation. It will
+return a list, where each element represents an individual row returned from
+the read operation. Both Query and Read APIs are supported.
+
+ReadFromSpanner relies on the ReadOperation objects which is exposed by the
+SpannerIO API. ReadOperation holds the immutable data which is responsible to
+execute batch and naive reads on spanner cloud. This is done for more
+convenient programming.
+
+ReadFromSpanner read from cloud spanner by providing the either 'sql' param
+in the constructor or 'table' name with 'columns' as list. For example:::
+
+  records = (pipeline
+            | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+            sql='Select * from users'))
+
+  records = (pipeline
+            | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+            table='users', columns=['id', 'name', 'email']))
+
+You can also performs the multiple reads by provide the list of ReadOperation
+to the ReadFromSpanner transform constructor. ReadOperation exposes two static
+methods. Use 'query' to perform sql based reads, 'table' to perform read from
+table name. For example:::
+
+  read_operations = [
+                      ReadOperation.table('customers', ['name', 'email']),
+                      ReadOperation.table('vendors', ['name', 'email']),
+                    ]
+  all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+        read_operations=read_operations)
+
+  ...OR...
+
+  read_operations = [
+                      ReadOperation.sql('Select name, email from customers'),
+                      ReadOperation.table('vendors', ['name', 'email']),
+                    ]
+  all_users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+        read_operations=read_operations)
+
+For more information, please review the docs on class ReadOperation.
+
+User can also able to provide the ReadOperation in form of PCollection via
+pipeline. For example:::
+
+  users = (pipeline
+           | beam.Create([ReadOperation...])
+           | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME))
+
+User may also create cloud spanner transaction from the transform called
+`create_transaction` which is available in the SpannerIO API.
+
+The transform is guaranteed to be executed on a consistent snapshot of data,
+utilizing the power of read only transactions. Staleness of data can be
+controlled by providing the `read_timestamp` or `exact_staleness` param values
+in the constructor.
+
+This transform requires root of the pipeline (PBegin) and returns the dict
+containing 'session_id' and 'transaction_id'. This `create_transaction`
+PTransform later passed to the constructor of ReadFromSpanner. For example:::
+
+  transaction = (pipeline | create_transaction(TEST_PROJECT_ID,
+                                              TEST_INSTANCE_ID,
+                                              DB_NAME))
+
+  users = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+        sql='Select * from users', transaction=transaction)
+
+  tweets = pipeline | ReadFromSpanner(PROJECT_ID, INSTANCE_ID, DB_NAME,
+        sql='Select * from tweets', transaction=transaction)
+
+For further details of this transform, please review the docs on the
+`create_transaction` method available in the SpannerIO API.
+
+ReadFromSpanner takes this transform in the constructor and pass this to the
+read pipeline as the singleton side input.
+"""
+from __future__ import absolute_import
+
+from collections import namedtuple
+
+from google.cloud.spanner import Client
+from google.cloud.spanner import KeySet
+from google.cloud.spanner_v1.database import BatchSnapshot
+
+from apache_beam import Create
+from apache_beam import DoFn
+from apache_beam import ParDo
+from apache_beam import Reshuffle
+from apache_beam.pvalue import AsSingleton
+from apache_beam.pvalue import PBegin
+from apache_beam.transforms import PTransform
+from apache_beam.transforms import ptransform_fn
+from apache_beam.transforms.display import DisplayDataItem
+from apache_beam.utils.annotations import experimental
+
+__all__ = ['create_transaction', 'ReadFromSpanner', 'ReadOperation']
+
+
+class ReadOperation(namedtuple("ReadOperation",
+                               ["read_operation", "batch_action",
+                                "transaction_action", "kwargs"])):
+  """
+  Encapsulates a spanner read operation.
+  """
+
+  __slots__ = ()
+
+  @classmethod
+  def query(cls, sql, params=None, param_types=None):
+    """
+    A convenient method to construct ReadOperation from sql query.
+
+    Args:
+      sql: SQL query statement
+      params: (optional) values for parameter replacement. Keys must match the
+        names used in sql
+      param_types: (optional) maps explicit types for one or more param values;
+        required if parameters are passed.
+    """
+
+    if params:
+      assert param_types is not None
+
+    return cls(
+        read_operation="process_query_batch",
+        batch_action="generate_query_batches", transaction_action="execute_sql",
+        kwargs={'sql': sql, 'params': params, 'param_types': param_types}
+    )
+
+  @classmethod
+  def table(cls, table, columns, index="", keyset=None):
+    """
+    A convenient method to construct ReadOperation from table.
+
+    Args:
+      table: name of the table from which to fetch data.
+      columns: names of columns to be retrieved.
+      index: (optional) name of index to use, rather than the table's primary
+        key.
+      keyset: (optional) `KeySet` keys / ranges identifying rows to be
+        retrieved.
+    """
+    keyset = keyset or KeySet(all_=True)
+    if not isinstance(keyset, KeySet):
+      raise ValueError("keyset must be an instance of class "
+                       "google.cloud.spanner_v1.keyset.KeySet")
+    return cls(
+        read_operation="process_read_batch",
+        batch_action="generate_read_batches", transaction_action="read",
+        kwargs={'table': table, 'columns': columns, 'index': index,
+                'keyset': keyset}
+    )
+
+
+class _BeamSpannerConfiguration(namedtuple(
+    "_BeamSpannerConfiguration", ["project", "instance", "database",
+                                  "credentials", "pool",
+                                  "snapshot_read_timestamp",
+                                  "snapshot_exact_staleness"])):
+  """
+  It holds the immutable data of the connection string to the cloud spanner.
+  """
+
+  @property
+  def snapshot_options(self):
+    snapshot_options = {}
+    if self.snapshot_exact_staleness:
+      snapshot_options['exact_staleness'] = self.snapshot_exact_staleness
+    if self.snapshot_read_timestamp:
+      snapshot_options['read_timestamp'] = self.snapshot_read_timestamp
+    return snapshot_options
+
+
+class _NaiveSpannerReadDoFn(DoFn):
+
+  def __init__(self, spanner_configuration):
+    """
+    A naive version of Spanner read which uses the transaction API of the
+    cloud spanner.
+    https://googleapis.dev/python/spanner/latest/transaction-api.html
+
+    Args:
+      spanner_configuration: (_BeamSpannerConfiguration) Connection details to
+        connect with cloud spanner.
+    """
+    self._spanner_configuration = spanner_configuration
+    self._snapshot = None
+
+  def to_runner_api_parameter(self, context):
+    return self.to_runner_api_pickled(context)
+
+  def setup(self):
+    # setting up client to connect with cloud spanner
+    spanner_client = Client(self._spanner_configuration.project)
+    instance = spanner_client.instance(self._spanner_configuration.instance)
+    self._database = instance.database(self._spanner_configuration.database,
+                                       pool=self._spanner_configuration.pool)
+
+  def process(self, element, transaction_info):
+    # We used batch snapshot to reuse the same transaction passed through the
+    # side input
+    self._snapshot = BatchSnapshot.from_dict(self._database, transaction_info)
+
+    # getting the transaction from the snapshot's session to run read operation.
+    with self._snapshot._get_session().transaction() as transaction:
+      if element.transaction_action == 'execute_sql':
+        transaction_read = transaction.execute_sql
+      elif element.transaction_action == 'read':
+        transaction_read = transaction.read
+      else:
+        raise ValueError("Not implemented ")
+
+      for row in transaction_read(**element.kwargs):
+        yield row
+
+  def teardown(self):
+    # close the connection
+    if self._snapshot:
+      self._snapshot.close()
+
+
+class _CreateReadPartitions(DoFn):
+  """
+  A DoFn to create partitions. Uses the Partitioning API (PartitionRead /
+  PartitionQuery) request to start a partitioned query operation. Returns a
+  list of batch information needed to perform the actual queries.
+
+  If the element is the instance of :class:`ReadOperation` is to perform sql
+  query, `PartitionQuery` API is used the create partitions and returns mappings
+  of information used perform actual partitioned reads via
+  :meth:`process_query_batch`.
+
+  If the element is the instance of :class:`ReadOperation` is to perform read
+  from table, `PartitionRead` API is used the create partitions and returns
+  mappings of information used perform actual partitioned reads via
+  :meth:`process_read_batch`.
+  """
+
+  def __init__(self, spanner_configuration):
+    self._spanner_configuration = spanner_configuration
+
+  def to_runner_api_parameter(self, context):
+    return self.to_runner_api_pickled(context)
+
+  def setup(self):
+    spanner_client = Client(project=self._spanner_configuration.project,
+                            credentials=self._spanner_configuration.credentials)
+    instance = spanner_client.instance(self._spanner_configuration.instance)
+    self._database = instance.database(self._spanner_configuration.database,
+                                       pool=self._spanner_configuration.pool)
+    self._snapshot = self._database.batch_snapshot(**self._spanner_configuration
+                                                   .snapshot_options)
+    self._snapshot_dict = self._snapshot.to_dict()
+
+  def process(self, element):
+    if element.batch_action == 'generate_query_batches':
+      partitioning_action = self._snapshot.generate_query_batches
+    elif element.batch_action == 'generate_read_batches':
+      partitioning_action = self._snapshot.generate_read_batches
+
+    for p in partitioning_action(**element.kwargs):
+      yield {"read_operation": element.read_operation, "partitions": p,
+             "transaction_info": self._snapshot_dict}
+
+  def teardown(self):
+    if self._snapshot:
+      self._snapshot.close()
+
+
+class _CreateTransactionFn(DoFn):
+  """
+  A DoFn to create the transaction of cloud spanner.
+  It connects to the database and and returns the transaction_id and session_id
+  by using the batch_snapshot.to_dict() method available in the google cloud
+  spanner sdk.
+
+  https://googleapis.dev/python/spanner/latest/database-api.html?highlight=
+  batch_snapshot#google.cloud.spanner_v1.database.BatchSnapshot.to_dict
+  """
+
+  def __init__(self, project_id, instance_id, database_id, credentials,
+               pool, read_timestamp,
+               exact_staleness):
+    self._project_id = project_id
+    self._instance_id = instance_id
+    self._database_id = database_id
+    self._credentials = credentials
+    self._pool = pool
+
+    self._snapshot_options = {}
+    if read_timestamp:
+      self._snapshot_options['read_timestamp'] = read_timestamp
+    if exact_staleness:
+      self._snapshot_options['exact_staleness'] = exact_staleness
+    self._snapshot = None
+
+  def to_runner_api_parameter(self, context):
+    return self.to_runner_api_pickled(context)
+
+  def setup(self):
+    self._spanner_client = Client(project=self._project_id,
+                                  credentials=self._credentials)
+    self._instance = self._spanner_client.instance(self._instance_id)
+    self._database = self._instance.database(self._database_id, pool=self._pool)
+
+  def process(self, element, *args, **kwargs):
+    self._snapshot = self._database.batch_snapshot(**self._snapshot_options)
+    return [self._snapshot.to_dict()]
+
+  def teardown(self):
+    if self._snapshot:
+      self._snapshot.close()
+
+
+@ptransform_fn
+def create_transaction(pbegin, project_id, instance_id, database_id,
+                       credentials=None, pool=None, read_timestamp=None,
+                       exact_staleness=None):
+  """
+  A PTransform method to create a batch transaction.
+
+  Args:
+    pbegin: Root of the pipeline
+    project_id: Cloud spanner project id. Be sure to use the Project ID,
+      not the Project Number.
+    instance_id: Cloud spanner instance id.
+    database_id: Cloud spanner database id.
+    credentials: (optional) The authorization credentials to attach to requests.
+      These credentials identify this application to the service.
+      If none are specified, the client will attempt to ascertain
+      the credentials from the environment.
+    pool: (optional) session pool to be used by database. If not passed, the
+      database will construct an instance of BurstyPool.
+    read_timestamp: (optional) An instance of the `datetime.datetime` object to
+      execute all reads at the given timestamp.
+    exact_staleness: (optional) And instance of `datetime.timedelta` object to
+      execute all reads at a timestamp that is exact_staleness old.
+  """
+
+  assert isinstance(pbegin, PBegin)
+
+  return (pbegin | Create([1]) | ParDo(_CreateTransactionFn(
+      project_id, instance_id, database_id, credentials,
+      pool, read_timestamp,
+      exact_staleness)))
+
+
+class _ReadFromPartitionFn(DoFn):
+  """
+  A DoFn to perform reads from the partition.
+  """
+
+  def __init__(self, spanner_configuration):
+    self._spanner_configuration = spanner_configuration
+
+  def to_runner_api_parameter(self, context):
+    return self.to_runner_api_pickled(context)
+
+  def setup(self):
+    spanner_client = Client(self._spanner_configuration.project)
+    instance = spanner_client.instance(self._spanner_configuration.instance)
+    self._database = instance.database(self._spanner_configuration.database,
+                                       pool=self._spanner_configuration.pool)
+    self._snapshot = self._database.batch_snapshot(**self._spanner_configuration
+                                                   .snapshot_options)
+
+  def process(self, element):
+    self._snapshot = BatchSnapshot.from_dict(
+        self._database,
+        element['transaction_info']
+    )
+
+    if element['read_operation'] == 'process_query_batch':
+      read_action = self._snapshot.process_query_batch
+    elif element['read_operation'] == 'process_read_batch':
+      read_action = self._snapshot.process_read_batch
+    else:
+      raise ValueError("Unknown read action.")
+
+    for row in read_action(element['partitions']):
+      yield row
+
+  def teardown(self):
+    if self._snapshot:
+      self._snapshot.close()
+
+
+@experimental()
+class ReadFromSpanner(PTransform):
+  """
+  A PTransform to perform reads from cloud spanner.
+  ReadFromSpanner uses BatchAPI to perform all read operations.
+  """
+
+  def __init__(self, project_id, instance_id, database_id, pool=None,
+               read_timestamp=None, exact_staleness=None, credentials=None,
+               sql=None, params=None, param_types=None,  # with_query
+               table=None, columns=None, index="", keyset=None,  # with_table
+               read_operations=None,  # for read all
+               transaction=None
+              ):
+    """
+    A PTransform that uses Spanner Batch API to perform reads.
+
+    Args:
+      project_id: Cloud spanner project id. Be sure to use the Project ID,
+        not the Project Number.
+      instance_id: Cloud spanner instance id.
+      database_id: Cloud spanner database id.
+      pool: (optional) session pool to be used by database. If not passed, the
+        database will construct an instance of BurstyPool.
+      read_timestamp: (optional) An instance of the `datetime.datetime` object
+        to execute all reads at the given timestamp.
+      exact_staleness: (optional) And instance of `datetime.timedelta` object
 
 Review comment:
   s/And/An/
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 341629)
    Time Spent: 20m  (was: 10m)

> Create a Spanner IO for Python
> ------------------------------
>
>                 Key: BEAM-7246
>                 URL: https://issues.apache.org/jira/browse/BEAM-7246
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp
>            Reporter: Reuven Lax
>            Assignee: Shehzaad Nakhoda
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> Add I/O support for Google Cloud Spanner for the Python SDK (Batch Only).
> Testing in this work item will be in the form of DirectRunner tests and manual testing.
> Integration and performance tests are a separate work item (not included here).
> See https://beam.apache.org/documentation/io/built-in/. The goal is to add Google Clound Spanner to the Database column for the Python/Batch row.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)