You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Pierre Oberholzer <pi...@gmail.com> on 2021/10/08 20:48:10 UTC

Beam/Python to BigTable

Dear Community,

I'm trying to use the following script to write into BigTable using
Beam/DataFlow, but get the below error. Did anyone manage to run this
script ?

Thanks for your support !

*Python*: 3.7

*Code*

1) Create instance
gcloud beta bigtable instances create test-instance --cluster test-cluster
--display-name test-instance --cluster-zone us-central1-a
--cluster-num-nodes 3

2) Main logic
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigtableio.py

3) Homemade pipeline

pipeline_options = PipelineOptions(
    save_main_session=True, streaming=True,
    runner='DataflowRunner',
    project=PROJECT,
    region=REGION,
    temp_location=TEMP_LOCATION,
    staging_location=STAGING_LOCATION
)

def run ():

     with beam.Pipeline(options=pipeline_options) as p:

input_subscription=f"projects/{PROJECT}/subscriptions/{SUBSCRIPTION}"
        _ = (p
       | 'Read from Pub/Sub' >>
beam.io.ReadFromPubSub(subscription=input_subscription).with_output_types(bytes)
       | 'Conversion UTF-8 bytes to string' >> beam.Map(lambda msg:
msg.decode('utf-8'))
       | WriteToBigTable(PROJECT,INSTANCE,TABLE))

*Error*

run()
Traceback (most recent call last):

  File "<ipython-input-49-ec9775ede022>", line 1, in <module>
    run()

  File "/<mycode>/bigtableio.py", line 300, in run
    TABLE))

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 582, in __exit__
    self.result = self.run()

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 532, in run
    self._options).run(False)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 558, in run
    pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 313, in dump_session
    return dill.dump_session(file_path)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 351, in dump_session
    pickler.dump(main)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 445, in dump
    StockPickler.dump(self, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 437, in dump
    self.save(obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 173, in save_module
    return old_save_module(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 1295, in save_module
    state=_main_dict)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 549, in save
    self.save_reduce(obj=obj, *rv)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 662, in save_reduce
    save(state)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 504, in save
    f(self, obj) # Call unbound method with explicit self

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/apache_beam/internal/pickler.py",
line 221, in new_save_module_dict
    return old_save_module_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/site-packages/dill/_dill.py",
line 912, in save_module_dict
    StockPickler.save_dict(pickler, obj)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 859, in save_dict
    self._batch_setitems(obj.items())

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 885, in _batch_setitems
    save(v)

  File
"/Users/<myself>/opt/anaconda3/envs/venv_beam_bigtable_37/lib/python3.7/pickle.py",
line 524, in save
    rv = reduce(self.proto)

  File "stringsource", line 2, in
grpc._cython.cygrpc.Channel.__reduce_cython__

TypeError: no default __reduce__ due to non-trivial __cinit__

-- 
Best regards, Pierre

Re: Beam/Python to BigTable

Posted by Ahmet Altay <al...@google.com>.
Thank you for the update Pierre!

On Fri, Oct 22, 2021 at 1:13 PM Pierre Oberholzer <
pierre.oberholzer@gmail.com> wrote:

> Hi,
>
> I found the issue and can now write from Beam/Python to BigTable.
> You just need to create FIRST the column family before writing (here with
> cbt):
>
> `cbt -instance test-instance createfamily test-table cf1`
>

Is this documented?

@Israel Herraiz <ih...@google.com> @Chamikara Jayalath <ch...@google.com>
/cc @David Huntsperger <dh...@google.com>


> Confusing is that no error is thrown when the column family is not
> existing.
> There seems to be a similar issue with cbt [1]. It'd be great to correct
> this.
> Let me know if I should raise another bug.
>

Could you please file a jira issue at
https://issues.apache.org/jira/projects/BEAM/issues ?


>
> Thanks !
>
> [1] https://issuetracker.google.com/issues/186053077
>
> Pierre
>
> Le mer. 20 oct. 2021 à 04:18, Pierre Oberholzer <
> pierre.oberholzer@gmail.com> a écrit :
>
>> Hi Bryan,
>>
>> Thanks again for your reply last week.
>> I’ve raised a ticket here:
>>
>> https://issuetracker.google.com/issues/202977204
>>
>> Is that what you mean by GCP support ?
>> Any idea on how reactive it is ?
>> Any other alternative to use meanwhile (Java I/O in Python)?
>>
>> Thanks for your support !
>>
>> Best regards, Pierre
>>
>> ---------- Message transféré ---------
>> De : Pierre Oberholzer <pi...@gmail.com>
>> Date : sam. 16 oct. 2021 à 08:47
>> Objet : Re: Beam/Python to BigTable
>> À : <us...@beam.apache.org>, <ih...@google.com>
>>
>>
>> Hi Everyone,
>>
>> I have raised a bug on GCP for this.
>> But..am I the only one trying to write from Beam to BigTable in Python ?
>> Is that a warning sign showing that this combo is not mature ?
>> Is there any attempt using the Java connector in Python ?
>>
>> Glad to hear about your experience and advice - and of course about other
>> ideas to solve this "bug".
>>
>> Thanks !
>>
>> Le mer. 13 oct. 2021 à 18:14, Pierre Oberholzer <
>> pierre.oberholzer@gmail.com> a écrit :
>>
>>> Hi Brian,
>>>
>>> Yes I do execute a run() at the end, and I see the Dataflow completing
>>> on the GUI (link <https://console.cloud.google.com/dataflow/jobs>).
>>> Thanks for asking ;)
>>> Is there maybe a commit () missing as referred to here
>>> <https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow>,
>>> and if yes, where to put it in the pipeline ?
>>>
>>> Le mer. 13 oct. 2021 à 18:08, Brian Hulette <bh...@google.com> a
>>> écrit :
>>>
>>>> Hey Pierre,
>>>> Sorry for the silly question but I have to ask - are you actually
>>>> running the pipeline? In your initial snippet you created the pipeline in a
>>>> context (with beam.Pipeline() as p:), which will run the pipeline when you
>>>> exit. But your latest snippet doesn't show the context, or a call to
>>>> p.run(). Are they missing, or just not shown?
>>>>
>>>> Otherwise I don't see anything obviously wrong with your code. You
>>>> might try contacting GCP support, since you're working with two GCP
>>>> products.
>>>>
>>>> Brian
>>>>
>>>> On Tue, Oct 12, 2021 at 10:22 PM Pierre Oberholzer <
>>>> pierre.oberholzer@gmail.com> wrote:
>>>>
>>>>> Dear Community,
>>>>>
>>>>> Glad to get your support here !
>>>>> Issue: empty BigTable when using the Python/Beam connector.
>>>>>
>>>>> Thanks !
>>>>>
>>>>> Le dim. 10 oct. 2021 à 14:34, Pierre Oberholzer <
>>>>> pierre.oberholzer@gmail.com> a écrit :
>>>>>
>>>>>> Thanks Israel, this helped. No error anymore, but the table remains
>>>>>> empty with this code
>>>>>> <https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
>>>>>> .
>>>>>>
>>>>>> *Code*
>>>>>>
>>>>>> class CreateRowFn(beam.DoFn):
>>>>>>
>>>>>>     def process(self, key):
>>>>>>         direct_row = row.DirectRow(row_key=key)
>>>>>>         direct_row.set_cell(
>>>>>>             "stats_summary",
>>>>>>             b"os_build",
>>>>>>             b"android",
>>>>>>             datetime.datetime.now())
>>>>>>         return [direct_row]
>>>>>>
>>>>>> _ = (p
>>>>>>                 |
>>>>>> beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
>>>>>>                 | beam.ParDo(CreateRowFn())
>>>>>>                 |
>>>>>> WriteToBigTable(project_id=pipeline_options.bigtable_project,
>>>>>>
>>>>>> instance_id=pipeline_options.bigtable_instance,
>>>>>>
>>>>>> table_id=pipeline_options.bigtable_table)
>>>>>> *Issue*
>>>>>>
>>>>>> Empty table
>>>>>> (checked with happybase and check = [(key,row) for key, row in
>>>>>> table.scan()])
>>>>>>
>>>>>> Thanks !
>>>>>>
>>>>>> Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> You have to write DirectRows to Bigtable, not strings. For more
>>>>>>> info, please see
>>>>>>> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>>
>>
>> --
>> Pierre
>> --
>> Pierre
>>
>
>
> --
> Pierre
>

Re: Beam/Python to BigTable

Posted by Pierre Oberholzer <pi...@gmail.com>.
Hi,

I found the issue and can now write from Beam/Python to BigTable.
You just need to create FIRST the column family before writing (here with
cbt):

`cbt -instance test-instance createfamily test-table cf1`

Confusing is that no error is thrown when the column family is not existing.
There seems to be a similar issue with cbt [1]. It'd be great to correct
this.
Let me know if I should raise another bug.

Thanks !

[1] https://issuetracker.google.com/issues/186053077

Pierre

Le mer. 20 oct. 2021 à 04:18, Pierre Oberholzer <pi...@gmail.com>
a écrit :

> Hi Bryan,
>
> Thanks again for your reply last week.
> I’ve raised a ticket here:
>
> https://issuetracker.google.com/issues/202977204
>
> Is that what you mean by GCP support ?
> Any idea on how reactive it is ?
> Any other alternative to use meanwhile (Java I/O in Python)?
>
> Thanks for your support !
>
> Best regards, Pierre
>
> ---------- Message transféré ---------
> De : Pierre Oberholzer <pi...@gmail.com>
> Date : sam. 16 oct. 2021 à 08:47
> Objet : Re: Beam/Python to BigTable
> À : <us...@beam.apache.org>, <ih...@google.com>
>
>
> Hi Everyone,
>
> I have raised a bug on GCP for this.
> But..am I the only one trying to write from Beam to BigTable in Python ?
> Is that a warning sign showing that this combo is not mature ?
> Is there any attempt using the Java connector in Python ?
>
> Glad to hear about your experience and advice - and of course about other
> ideas to solve this "bug".
>
> Thanks !
>
> Le mer. 13 oct. 2021 à 18:14, Pierre Oberholzer <
> pierre.oberholzer@gmail.com> a écrit :
>
>> Hi Brian,
>>
>> Yes I do execute a run() at the end, and I see the Dataflow completing on
>> the GUI (link <https://console.cloud.google.com/dataflow/jobs>). Thanks
>> for asking ;)
>> Is there maybe a commit () missing as referred to here
>> <https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow>,
>> and if yes, where to put it in the pipeline ?
>>
>> Le mer. 13 oct. 2021 à 18:08, Brian Hulette <bh...@google.com> a
>> écrit :
>>
>>> Hey Pierre,
>>> Sorry for the silly question but I have to ask - are you actually
>>> running the pipeline? In your initial snippet you created the pipeline in a
>>> context (with beam.Pipeline() as p:), which will run the pipeline when you
>>> exit. But your latest snippet doesn't show the context, or a call to
>>> p.run(). Are they missing, or just not shown?
>>>
>>> Otherwise I don't see anything obviously wrong with your code. You might
>>> try contacting GCP support, since you're working with two GCP products.
>>>
>>> Brian
>>>
>>> On Tue, Oct 12, 2021 at 10:22 PM Pierre Oberholzer <
>>> pierre.oberholzer@gmail.com> wrote:
>>>
>>>> Dear Community,
>>>>
>>>> Glad to get your support here !
>>>> Issue: empty BigTable when using the Python/Beam connector.
>>>>
>>>> Thanks !
>>>>
>>>> Le dim. 10 oct. 2021 à 14:34, Pierre Oberholzer <
>>>> pierre.oberholzer@gmail.com> a écrit :
>>>>
>>>>> Thanks Israel, this helped. No error anymore, but the table remains
>>>>> empty with this code
>>>>> <https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
>>>>> .
>>>>>
>>>>> *Code*
>>>>>
>>>>> class CreateRowFn(beam.DoFn):
>>>>>
>>>>>     def process(self, key):
>>>>>         direct_row = row.DirectRow(row_key=key)
>>>>>         direct_row.set_cell(
>>>>>             "stats_summary",
>>>>>             b"os_build",
>>>>>             b"android",
>>>>>             datetime.datetime.now())
>>>>>         return [direct_row]
>>>>>
>>>>> _ = (p
>>>>>                 |
>>>>> beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
>>>>>                 | beam.ParDo(CreateRowFn())
>>>>>                 |
>>>>> WriteToBigTable(project_id=pipeline_options.bigtable_project,
>>>>>
>>>>> instance_id=pipeline_options.bigtable_instance,
>>>>>
>>>>> table_id=pipeline_options.bigtable_table)
>>>>> *Issue*
>>>>>
>>>>> Empty table
>>>>> (checked with happybase and check = [(key,row) for key, row in
>>>>> table.scan()])
>>>>>
>>>>> Thanks !
>>>>>
>>>>> Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a écrit :
>>>>>
>>>>>> You have to write DirectRows to Bigtable, not strings. For more info,
>>>>>> please see
>>>>>> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>
>>
>> --
>> Pierre
>>
>
>
> --
> Pierre
> --
> Pierre
>


-- 
Pierre

Re: Beam/Python to BigTable

Posted by Pierre Oberholzer <pi...@gmail.com>.
Hi Everyone,

I have raised a bug on GCP for this.
But..am I the only one trying to write from Beam to BigTable in Python ?
Is that a warning sign showing that this combo is not mature ?
Is there any attempt using the Java connector in Python ?

Glad to hear about your experience and advice - and of course about other
ideas to solve this "bug".

Thanks !

Le mer. 13 oct. 2021 à 18:14, Pierre Oberholzer <pi...@gmail.com>
a écrit :

> Hi Brian,
>
> Yes I do execute a run() at the end, and I see the Dataflow completing on
> the GUI (link <https://console.cloud.google.com/dataflow/jobs>). Thanks
> for asking ;)
> Is there maybe a commit () missing as referred to here
> <https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow>,
> and if yes, where to put it in the pipeline ?
>
> Le mer. 13 oct. 2021 à 18:08, Brian Hulette <bh...@google.com> a
> écrit :
>
>> Hey Pierre,
>> Sorry for the silly question but I have to ask - are you actually running
>> the pipeline? In your initial snippet you created the pipeline in a context
>> (with beam.Pipeline() as p:), which will run the pipeline when you exit.
>> But your latest snippet doesn't show the context, or a call to p.run(). Are
>> they missing, or just not shown?
>>
>> Otherwise I don't see anything obviously wrong with your code. You might
>> try contacting GCP support, since you're working with two GCP products.
>>
>> Brian
>>
>> On Tue, Oct 12, 2021 at 10:22 PM Pierre Oberholzer <
>> pierre.oberholzer@gmail.com> wrote:
>>
>>> Dear Community,
>>>
>>> Glad to get your support here !
>>> Issue: empty BigTable when using the Python/Beam connector.
>>>
>>> Thanks !
>>>
>>> Le dim. 10 oct. 2021 à 14:34, Pierre Oberholzer <
>>> pierre.oberholzer@gmail.com> a écrit :
>>>
>>>> Thanks Israel, this helped. No error anymore, but the table remains
>>>> empty with this code
>>>> <https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
>>>> .
>>>>
>>>> *Code*
>>>>
>>>> class CreateRowFn(beam.DoFn):
>>>>
>>>>     def process(self, key):
>>>>         direct_row = row.DirectRow(row_key=key)
>>>>         direct_row.set_cell(
>>>>             "stats_summary",
>>>>             b"os_build",
>>>>             b"android",
>>>>             datetime.datetime.now())
>>>>         return [direct_row]
>>>>
>>>> _ = (p
>>>>                 |
>>>> beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
>>>>                 | beam.ParDo(CreateRowFn())
>>>>                 |
>>>> WriteToBigTable(project_id=pipeline_options.bigtable_project,
>>>>
>>>> instance_id=pipeline_options.bigtable_instance,
>>>>
>>>> table_id=pipeline_options.bigtable_table)
>>>> *Issue*
>>>>
>>>> Empty table
>>>> (checked with happybase and check = [(key,row) for key, row in
>>>> table.scan()])
>>>>
>>>> Thanks !
>>>>
>>>> Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a écrit :
>>>>
>>>>> You have to write DirectRows to Bigtable, not strings. For more info,
>>>>> please see
>>>>> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>>>>>
>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>
>>>
>>> --
>>> Pierre
>>>
>>
>
> --
> Pierre
>


-- 
Pierre

Re: Beam/Python to BigTable

Posted by Brian Hulette <bh...@google.com>.
Hey Pierre,
Sorry for the silly question but I have to ask - are you actually running
the pipeline? In your initial snippet you created the pipeline in a context
(with beam.Pipeline() as p:), which will run the pipeline when you exit.
But your latest snippet doesn't show the context, or a call to p.run(). Are
they missing, or just not shown?

Otherwise I don't see anything obviously wrong with your code. You might
try contacting GCP support, since you're working with two GCP products.

Brian

On Tue, Oct 12, 2021 at 10:22 PM Pierre Oberholzer <
pierre.oberholzer@gmail.com> wrote:

> Dear Community,
>
> Glad to get your support here !
> Issue: empty BigTable when using the Python/Beam connector.
>
> Thanks !
>
> Le dim. 10 oct. 2021 à 14:34, Pierre Oberholzer <
> pierre.oberholzer@gmail.com> a écrit :
>
>> Thanks Israel, this helped. No error anymore, but the table remains empty
>> with this code
>> <https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
>> .
>>
>> *Code*
>>
>> class CreateRowFn(beam.DoFn):
>>
>>     def process(self, key):
>>         direct_row = row.DirectRow(row_key=key)
>>         direct_row.set_cell(
>>             "stats_summary",
>>             b"os_build",
>>             b"android",
>>             datetime.datetime.now())
>>         return [direct_row]
>>
>> _ = (p
>>                 |
>> beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
>>                 | beam.ParDo(CreateRowFn())
>>                 |
>> WriteToBigTable(project_id=pipeline_options.bigtable_project,
>>
>> instance_id=pipeline_options.bigtable_instance,
>>
>> table_id=pipeline_options.bigtable_table)
>> *Issue*
>>
>> Empty table
>> (checked with happybase and check = [(key,row) for key, row in
>> table.scan()])
>>
>> Thanks !
>>
>> Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a écrit :
>>
>>> You have to write DirectRows to Bigtable, not strings. For more info,
>>> please see
>>> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>>>
>>
>>
>> --
>> Pierre
>>
>
>
> --
> Pierre
>

Re: Beam/Python to BigTable

Posted by Pierre Oberholzer <pi...@gmail.com>.
Dear Community,

Glad to get your support here !
Issue: empty BigTable when using the Python/Beam connector.

Thanks !

Le dim. 10 oct. 2021 à 14:34, Pierre Oberholzer <pi...@gmail.com>
a écrit :

> Thanks Israel, this helped. No error anymore, but the table remains empty
> with this code
> <https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
> .
>
> *Code*
>
> class CreateRowFn(beam.DoFn):
>
>     def process(self, key):
>         direct_row = row.DirectRow(row_key=key)
>         direct_row.set_cell(
>             "stats_summary",
>             b"os_build",
>             b"android",
>             datetime.datetime.now())
>         return [direct_row]
>
> _ = (p
>                 |
> beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
>                 | beam.ParDo(CreateRowFn())
>                 |
> WriteToBigTable(project_id=pipeline_options.bigtable_project,
>
> instance_id=pipeline_options.bigtable_instance,
>                                   table_id=pipeline_options.bigtable_table)
> *Issue*
>
> Empty table
> (checked with happybase and check = [(key,row) for key, row in
> table.scan()])
>
> Thanks !
>
> Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a écrit :
>
>> You have to write DirectRows to Bigtable, not strings. For more info,
>> please see
>> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>>
>
>
> --
> Pierre
>


-- 
Pierre

Re: Beam/Python to BigTable

Posted by Pierre Oberholzer <pi...@gmail.com>.
Thanks Israel, this helped. No error anymore, but the table remains empty
with this code
<https://stackoverflow.com/questions/63035772/streaming-pipeline-in-dataflow-to-bigtable-python>
.

*Code*

class CreateRowFn(beam.DoFn):

    def process(self, key):
        direct_row = row.DirectRow(row_key=key)
        direct_row.set_cell(
            "stats_summary",
            b"os_build",
            b"android",
            datetime.datetime.now())
        return [direct_row]

_ = (p
                |
beam.Create(["phone#4c410523#20190501","phone#4c410523#20190502"])
                | beam.ParDo(CreateRowFn())
                |
WriteToBigTable(project_id=pipeline_options.bigtable_project,

instance_id=pipeline_options.bigtable_instance,
                                  table_id=pipeline_options.bigtable_table)
*Issue*

Empty table
(checked with happybase and check = [(key,row) for key, row in
table.scan()])

Thanks !

Le sam. 9 oct. 2021 à 21:37, Israel Herraiz <ih...@google.com> a écrit :

> You have to write DirectRows to Bigtable, not strings. For more info,
> please see
> https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow
>


-- 
Pierre

Re: Beam/Python to BigTable

Posted by Israel Herraiz <ih...@google.com>.
You have to write DirectRows to Bigtable, not strings. For more info,
please see
https://googleapis.dev/python/bigtable/latest/row.html#google.cloud.bigtable.row.DirectRow