You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Brendan Wee via user <us...@beam.apache.org> on 2023/03/24 00:01:16 UTC
Python SDK - apache_beam.io.gcp.spanner module - How to model Spanner Schema with Named Tuple
Hello,
I am trying to use the apache beam GCP spannerIO module
<https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.gcp.spanner.html>
to
make a transformation on our Google Cloud Spanner database but I am having
trouble creating a Named Tuple to read data. I have not been able to find
much documentation on how to do this and would love any advice or help
you can provide.
Our spanner table schema is:
[image: Screen Shot 2023-03-23 at 11.29.14 AM.png]
I tried to model this using the following Named_Tuple class:
from google.cloud.spanner_v1.data_types import JsonObject
class OriginalRow(NamedTuple):
file_metadata_id: str
project_id: str
file_md5sum: str
bucket_received: str
file_size: int
transfer_date: str
storage_class: str
user_metadata: JsonObject
gcs_uri: str
generation_numbers: JsonObject
is_deleted: bool
created_on: datetime
last_modified: datetime
gcs_object_name: str
file_creator: str
cpp_harmony_plate_name: str
cpp_experiment_folder: str
cpp_experiment_id: str
cpp_compound_layout_id: str
cpp_well: str
I am trying to read the rows from my spanner table using the following code:
pipeline_args = [
'--runner=DataflowRunner',
'--project='my-gcp',
'--region=us-west1',
'--temp_location=gs://my-test-bucket/tmp'
'--staging_location=gs://my-test-bucket/staging'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
rows = (
p
| 'Read Rows' >> ReadFromSpanner(
instance_id='my-spanner-instance',
database_id='my-spanner-database',
project_id='my-gcp-project',
row_type=OriginalRow,
sql='SELECT * FROM my_table_name limit 10',
timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
staleness=3,
time_unit=TimeUnit.HOURS,
).with_output_types(OriginalRow)
)
but receive the following error:
`ValueError: not enough values to unpack (expected 2, got 0)`
I have been experimenting a bit with providing fewer columns, and different
types but feel as though I am stumbling in the dark. I would greatly
appreciate any guidance you may have.
Sincerely,
Brendan
Brendan Wee, MS. | Engineer
Calico Life Sciences LLC | 1170 Veterans Blvd. | South San Francisco, CA
94080
T: (925) 788-8196 | brendan@calicolabs.com