You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Brendan Wee via user <us...@beam.apache.org> on 2023/03/24 00:01:16 UTC

Python SDK - apache_beam.io.gcp.spanner module - How to model Spanner Schema with Named Tuple

Hello,

I am trying to use the apache beam GCP spannerIO module
<https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.gcp.spanner.html>
to
make a transformation on our Google Cloud Spanner database but I am having
trouble creating a Named Tuple to read data. I have not been able to find
much documentation on how to do this and would love any advice or help
you can provide.

Our spanner table schema is:

[image: Screen Shot 2023-03-23 at 11.29.14 AM.png]


I tried to model this using the following Named_Tuple class:


from google.cloud.spanner_v1.data_types import JsonObject

class OriginalRow(NamedTuple):
    file_metadata_id: str
    project_id: str
    file_md5sum: str
    bucket_received: str
    file_size: int
    transfer_date: str
    storage_class: str
    user_metadata: JsonObject
    gcs_uri: str
    generation_numbers: JsonObject
    is_deleted: bool
    created_on: datetime
    last_modified: datetime
    gcs_object_name: str
    file_creator: str
    cpp_harmony_plate_name: str
    cpp_experiment_folder: str
    cpp_experiment_id: str
    cpp_compound_layout_id: str
    cpp_well: str

I am trying to read the rows from my spanner table using the following code:


pipeline_args = [
    '--runner=DataflowRunner',
    '--project='my-gcp',
    '--region=us-west1',
    '--temp_location=gs://my-test-bucket/tmp'
    '--staging_location=gs://my-test-bucket/staging'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
    rows = (
        p
        | 'Read Rows' >> ReadFromSpanner(
                instance_id='my-spanner-instance',
                database_id='my-spanner-database',
                project_id='my-gcp-project',
                row_type=OriginalRow,
                sql='SELECT * FROM my_table_name limit 10',
                timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
                staleness=3,
                time_unit=TimeUnit.HOURS,
            ).with_output_types(OriginalRow)
    )


but receive the following error:

`ValueError: not enough values to unpack (expected 2, got 0)`


I have been experimenting a bit with providing fewer columns, and different
types but feel as though I am stumbling in the dark. I would greatly
appreciate any guidance you may have.


Sincerely,

Brendan


Brendan Wee, MS. | Engineer
Calico Life Sciences LLC | 1170 Veterans Blvd. | South San Francisco, CA
94080
T: (925) 788-8196 | brendan@calicolabs.com