You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Brendan Wee via user <us...@beam.apache.org> on 2023/04/13 18:59:42 UTC

Python SDK - modeling google spanner schema with named tuple

Hello,

I am trying to use the apache beam GCP spannerIO module
<https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.gcp.spanner.html>
to
make a transformation on our Google Cloud Spanner database but I am having
trouble creating a Named Tuple to read data. I have not been able to find
much documentation on how to do this and would love any advice or help
you can provide.

Our spanner table schema is:

[image: Screen Shot 2023-03-23 at 11.29.14 AM.png]


I tried to model this using the following Named_Tuple class:


from google.cloud.spanner_v1.data_types import JsonObject

class OriginalRow(NamedTuple):
    file_metadata_id: str
    project_id: str
    file_md5sum: str
    bucket_received: str
    file_size: int
    transfer_date: str
    storage_class: str
    user_metadata: JsonObject
    gcs_uri: str
    generation_numbers: JsonObject
    is_deleted: bool
    created_on: datetime
    last_modified: datetime
    gcs_object_name: str
    file_creator: str
    cpp_harmony_plate_name: str
    cpp_experiment_folder: str
    cpp_experiment_id: str
    cpp_compound_layout_id: str
    cpp_well: str

I am trying to read the rows from my spanner table using the following code:


pipeline_args = [
    '--runner=DataflowRunner',
    '--project='my-gcp',
    '--region=us-west1',
    '--temp_location=gs://my-test-bucket/tmp'
    '--staging_location=gs://my-test-bucket/staging'
]
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session

# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=pipeline_options) as p:
    rows = (
        p
        | 'Read Rows' >> ReadFromSpanner(
                instance_id='my-spanner-instance',
                database_id='my-spanner-database',
                project_id='my-gcp-project',
                row_type=OriginalRow,
                sql='SELECT * FROM my_table_name limit 10',
                timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
                staleness=3,
                time_unit=TimeUnit.HOURS,
            ).with_output_types(OriginalRow)
    )


but receive the following error:

`ValueError: not enough values to unpack (expected 2, got 0)`


I have been experimenting a bit with providing fewer columns, and different
types but feel as though I am stumbling in the dark. I would greatly
appreciate any guidance you may have.


Sincerely,

Brendan


Brendan Wee, MS. | Engineer
Calico Life Sciences LLC | 1170 Veterans Blvd. | South San Francisco, CA
94080
T: (925) 788-8196 | brendan@calicolabs.com

Re: Python SDK - modeling google spanner schema with named tuple

Posted by XQ Hu via user <us...@beam.apache.org>.
I only did a very simple test and it works. I suspect one of your columns
could cause some issues. Maybe you can try to select some columns first.

Below is my simple test:



































*from typing import NamedTupleimport apache_beam as beamfrom
apache_beam.io.gcp.spanner import ReadFromSpanner# Define the schema for
the data to be read from Spanner using a NamedTupleclass
MyData(NamedTuple):    id: int    person_name: str    age: int# Define the
pipeline optionsoptions = beam.pipeline.PipelineOptions()# Define the
Spanner configuration and query# query = "SELECT id, person_name, age FROM
test_xqhu"query = "SELECT * FROM test_xqhu"# Define the pipeline and load
the data from Spanner into a PCollection of NamedTupleswith
beam.Pipeline(options=options) as p:    data = (        p        |
ReadFromSpanner(            instance_id="test-spanner-io-xqhu",
database_id="test",            project_id="manav-jit-test",
row_type=MyData,            sql=query,        )        | beam.Map(lambda x:
MyData(x.id <http://x.id>, x.person_name, x.age))        | beam.Map(print)
  )*


On Thu, Apr 13, 2023 at 3:00 PM Brendan Wee via user <us...@beam.apache.org>
wrote:

> Hello,
>
> I am trying to use the apache beam GCP spannerIO module
> <https://beam.apache.org/releases/pydoc/2.30.0/apache_beam.io.gcp.spanner.html> to
> make a transformation on our Google Cloud Spanner database but I am having
> trouble creating a Named Tuple to read data. I have not been able to find
> much documentation on how to do this and would love any advice or help
> you can provide.
>
> Our spanner table schema is:
>
> [image: Screen Shot 2023-03-23 at 11.29.14 AM.png]
>
>
> I tried to model this using the following Named_Tuple class:
>
>
> from google.cloud.spanner_v1.data_types import JsonObject
>
> class OriginalRow(NamedTuple):
>     file_metadata_id: str
>     project_id: str
>     file_md5sum: str
>     bucket_received: str
>     file_size: int
>     transfer_date: str
>     storage_class: str
>     user_metadata: JsonObject
>     gcs_uri: str
>     generation_numbers: JsonObject
>     is_deleted: bool
>     created_on: datetime
>     last_modified: datetime
>     gcs_object_name: str
>     file_creator: str
>     cpp_harmony_plate_name: str
>     cpp_experiment_folder: str
>     cpp_experiment_id: str
>     cpp_compound_layout_id: str
>     cpp_well: str
>
> I am trying to read the rows from my spanner table using the following
> code:
>
>
> pipeline_args = [
>     '--runner=DataflowRunner',
>     '--project='my-gcp',
>     '--region=us-west1',
>     '--temp_location=gs://my-test-bucket/tmp'
>     '--staging_location=gs://my-test-bucket/staging'
> ]
> pipeline_options = PipelineOptions(pipeline_args)
> pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
>
> # The pipeline will be run on exiting the with block.
> with beam.Pipeline(options=pipeline_options) as p:
>     rows = (
>         p
>         | 'Read Rows' >> ReadFromSpanner(
>                 instance_id='my-spanner-instance',
>                 database_id='my-spanner-database',
>                 project_id='my-gcp-project',
>                 row_type=OriginalRow,
>                 sql='SELECT * FROM my_table_name limit 10',
>                 timestamp_bound_mode=TimestampBoundMode.MAX_STALENESS,
>                 staleness=3,
>                 time_unit=TimeUnit.HOURS,
>             ).with_output_types(OriginalRow)
>     )
>
>
> but receive the following error:
>
> `ValueError: not enough values to unpack (expected 2, got 0)`
>
>
> I have been experimenting a bit with providing fewer columns, and
> different types but feel as though I am stumbling in the dark. I would
> greatly appreciate any guidance you may have.
>
>
> Sincerely,
>
> Brendan
>
>
> Brendan Wee, MS. | Engineer
> Calico Life Sciences LLC | 1170 Veterans Blvd. | South San Francisco, CA
> 94080
> T: (925) 788-8196 | brendan@calicolabs.com
>
>
>