You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/03 00:06:02 UTC

[GitHub] [beam] mf2199 commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python

mf2199 commented on a change in pull request #8457: [BEAM-3342] Create a Cloud Bigtable IO connector for Python
URL: https://github.com/apache/beam/pull/8457#discussion_r402664846
 
 

 ##########
 File path: sdks/python/apache_beam/io/gcp/bigtableio.py
 ##########
 @@ -141,3 +144,123 @@ def expand(self, pvalue):
             | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
                                           beam_options['instance_id'],
                                           beam_options['table_id'])))
+
+
+class _BigtableReadFn(beam.DoFn):
+  def __init__(self, project_id, instance_id, table_id, filter_=None):
+    """ A DoFn to parallelize reading from a Bigtable table
+
+    :type project_id: str
+    :param project_id: The ID of the project used for Bigtable access
+
+    :type instance_id: str
+    :param instance_id: The ID of the instance that owns the table.
+
+    :type table_id: str
+    :param table_id: The ID of the table.
+
+    :type filter_: :class:`.RowFilter`
+    :param filter_: (Optional) The filter to apply to the contents of the
+                    specified row(s). If unset, reads every column in
+                    each row.
+    """
+    super(self.__class__, self).__init__()
+    self._initialize({'project_id': project_id,
+                     'instance_id': instance_id,
+                     'table_id': table_id,
+                     'filter_': filter_})
+
+  def _initialize(self, options):
+    """ The defaults initializer, to assist with pickling
+
+    :return: None
+    """
+    self._options = options
+    self._table = None
+    self._counter = Metrics.counter(self.__class__, 'Rows Read')
+
+  def __getstate__(self):
+    return self._options
+
+  def __setstate__(self, options):
+    self._initialize(options)
+
+  def start_bundle(self):
+    # from google.cloud.bigtable import Client
+    if self._table is None:
+      # noinspection PyAttributeOutsideInit
+      self._table = Client(project=self._options['project_id'])\
+        .instance(self._options['instance_id'])\
+        .table(self._options['table_id'])
+
+  def process(self, source_bundle):
+    _start_key = source_bundle.start_position
+    _end_key = source_bundle.stop_position
+    for row in self._table.read_rows(_start_key, _end_key):
+      self._counter.inc()
+      yield row
+
+  def display_data(self):
+    return {'projectId': DisplayDataItem(self._options['project_id'],
+                                         label='Bigtable Project Id'),
+            'instanceId': DisplayDataItem(self._options['instance_id'],
+                                          label='Bigtable Instance Id'),
+            'tableId': DisplayDataItem(self._options['table_id'],
+                                       label='Bigtable Table Id')}
+
+
+class ReadFromBigtable(beam.PTransform):
 
 Review comment:
   Done.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services