You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2020/04/03 00:07:00 UTC

[jira] [Work logged] (BEAM-3342) Create a Cloud Bigtable IO connector for Python

     [ https://issues.apache.org/jira/browse/BEAM-3342?focusedWorklogId=415190&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-415190 ]

ASF GitHub Bot logged work on BEAM-3342:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Apr/20 00:06
            Start Date: 03/Apr/20 00:06
    Worklog Time Spent: 10m 
      Work Description: mf2199 commented on pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r402664846
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+    """ A DoFn to parallelize reading from a Bigtable table
+
+    :type project_id: str
+    :param project_id: The ID of the project used for Bigtable access
+
+    :type instance_id: str
+    :param instance_id: The ID of the instance that owns the table.
+
+    :type table_id: str
+    :param table_id: The ID of the table.
+
+    :type filter_: :class:`.RowFilter`
+    :param filter_: (Optional) The filter to apply to the contents of the
+                    specified row(s). If unset, reads every column in
+                    each row.
+    """
+    super(self.__class__, self).__init__()
+    self._initialize({'project_id': project_id,
+                     'instance_id': instance_id,
+                     'table_id': table_id,
+                     'filter_': filter_})
+
+  def _initialize(self, options):
+    """ The defaults initializer, to assist with pickling
+
+    :return: None
+    """
+    self._options = options
+    self._table = None
+    self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+    return self._options
+
+  def __setstate__(self, options):
+    self._initialize(options)
+
+  def start_bundle(self):
+    # from google.cloud.bigtable import Client
+    if self._table is None:
+      # noinspection PyAttributeOutsideInit
+      self._table = Client(project=self._options['project_id'])\
+        .instance(self._options['instance_id'])\
+        .table(self._options['table_id'])
+
+  def process(self, source_bundle):
+    _start_key = source_bundle.start_position
+    _end_key = source_bundle.stop_position
+    for row in self._table.read_rows(_start_key, _end_key):
+      self._counter.inc()
+      yield row
+
+  def display_data(self):
+    return {'projectId': DisplayDataItem(self._options['project_id'],
+                                         label='Bigtable Project Id'),
+            'instanceId': DisplayDataItem(self._options['instance_id'],
+                                          label='Bigtable Instance Id'),
+            'tableId': DisplayDataItem(self._options['table_id'],
+                                       label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
 
 Review comment:
   Done.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 415190)
    Time Spent: 45h 40m  (was: 45.5h)

> Create a Cloud Bigtable IO connector for Python
> -----------------------------------------------
>
>                 Key: BEAM-3342
>                 URL: https://issues.apache.org/jira/browse/BEAM-3342
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Solomon Duskis
>            Assignee: Solomon Duskis
>            Priority: Major
>          Time Spent: 45h 40m
>  Remaining Estimate: 0h
>
> I would like to create a Cloud Bigtable python connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)