You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/01/30 18:37:21 UTC

[beam] branch master updated: [BEAM-3342] Create a Cloud Bigtable Python connector Write

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 59499f6  [BEAM-3342] Create a Cloud Bigtable Python connector Write
     new 054d42c  Merge pull request #7367: [BEAM-3342] Create a Cloud Bigtable Python connector Write
59499f6 is described below

commit 59499f682204f51e1a5654973ac9e6084ab6b100
Author: Juan Rael <ju...@qlogic.io>
AuthorDate: Tue Jan 29 10:16:44 2019 -0500

    [BEAM-3342] Create a Cloud Bigtable Python connector Write
---
 .../jenkins/dependency_check/generate_report.sh    |   2 +
 .../job_beam_PerformanceTests_Analysis.groovy      |   2 +-
 ownership/PYTHON_DEPENDENCY_OWNERS.yaml            |   6 +
 .../examples/cookbook/bigtableio_it_test.py        | 194 +++++++++++++++++++++
 sdks/python/apache_beam/io/gcp/bigtableio.py       | 143 +++++++++++++++
 sdks/python/container/base_image_requirements.txt  |   2 +
 sdks/python/setup.py                               |   2 +
 sdks/python/tox.ini                                |   5 +-
 8 files changed, 354 insertions(+), 2 deletions(-)

diff --git a/.test-infra/jenkins/dependency_check/generate_report.sh b/.test-infra/jenkins/dependency_check/generate_report.sh
index 13c6a0f..734aa44 100755
--- a/.test-infra/jenkins/dependency_check/generate_report.sh
+++ b/.test-infra/jenkins/dependency_check/generate_report.sh
@@ -42,6 +42,8 @@ REPORT_DESCRIPTION="
 /usr/bin/virtualenv dependency/check
 . dependency/check/bin/activate
 pip install --upgrade google-cloud-bigquery
+pip install --upgrade google-cloud-bigtable
+pip install --upgrade google-cloud-core
 rm -f build/dependencyUpdates/beam-dependency-check-report.txt
 
 # Insall packages and run the unit tests of the report generator and the jira manager
diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
index 59c347d..efa8353 100644
--- a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
@@ -73,7 +73,7 @@ job(testConfiguration.jobName) {
         shell('.env/bin/pip install --upgrade setuptools pip')
 
         // Install job requirements for analysis script.
-        shell('.env/bin/pip install requests google.cloud.bigquery mock')
+        shell('.env/bin/pip install requests google.cloud.bigquery mock google.cloud.bigtable google.cloud')
 
         // Launch verification tests before executing script.
         shell('.env/bin/python ' + commonJobProperties.checkoutDir + '/.test-infra/jenkins/verify_performance_test_results_test.py')
diff --git a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
index 7e46c6e..ba81490 100644
--- a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
+++ b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
@@ -42,9 +42,15 @@ deps:
   google-apitools:
     owners:
 
+  google-cloud-core:
+    owners:
+
   google-cloud-bigquery:
     owners: markflyhigh
 
+  google-cloud-bigtable:
+    owners:
+
   google-cloud-pubsub:
     owners: markflyhigh
 
diff --git a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
new file mode 100644
index 0000000..82e2869
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
@@ -0,0 +1,194 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import pytz
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtableio import WriteToBigTable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud._helpers import _datetime_from_microseconds
+  from google.cloud._helpers import _microseconds_from_datetime
+  from google.cloud._helpers import UTC
+  from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+  Client = None
+  UTC = pytz.utc
+  _microseconds_from_datetime = lambda label_stamp: label_stamp
+  _datetime_from_microseconds = lambda micro: micro
+
+
+EXISTING_INSTANCES = []
+LABEL_KEY = u'python-bigtable-beam'
+label_stamp = datetime.datetime.utcnow().replace(tzinfo=UTC)
+label_stamp_micros = _microseconds_from_datetime(label_stamp)
+LABELS = {LABEL_KEY: str(label_stamp_micros)}
+
+
+class GenerateTestRows(beam.PTransform):
+  """ A transform test to run write to the Bigtable Table.
+
+  A PTransform that generate a list of `DirectRow` to write it in
+  Bigtable Table.
+
+  """
+  def __init__(self, number, project_id=None, instance_id=None,
+               table_id=None):
+    super(WriteToBigTable, self).__init__()
+    self.number = number
+    self.rand = random.choice(string.ascii_letters + string.digits)
+    self.column_family_id = 'cf1'
+    self.beam_options = {'project_id': project_id,
+                         'instance_id': instance_id,
+                         'table_id': table_id}
+
+  def _generate(self):
+    value = ''.join(self.rand for i in range(100))
+
+    for index in range(self.number):
+      key = "beam_key%s" % ('{0:07}'.format(index))
+      direct_row = row.DirectRow(row_key=key)
+      for column_id in range(10):
+        direct_row.set_cell(self.column_family_id,
+                            ('field%s' % column_id).encode('utf-8'),
+                            value,
+                            datetime.datetime.now())
+      yield direct_row
+
+  def expand(self, pvalue):
+    beam_options = self.beam_options
+    return (pvalue
+            | beam.Create(self._generate())
+            | WriteToBigTable(beam_options['project_id'],
+                              beam_options['instance_id'],
+                              beam_options['table_id']))
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteTest(unittest.TestCase):
+  """ Bigtable Write Connector Test
+
+  """
+  DEFAULT_TABLE_PREFIX = "python-test"
+  instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+  number = 500
+  LOCATION_ID = "us-east1-b"
+
+  def setUp(self):
+    try:
+      from google.cloud.bigtable import enums
+      self.STORAGE_TYPE = enums.StorageType.HDD
+      self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT
+    except ImportError:
+      self.STORAGE_TYPE = 2
+      self.INSTANCE_TYPE = 2
+
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.runner_name = type(self.test_pipeline.runner).__name__
+    self.project = self.test_pipeline.get_option('project')
+    self.client = Client(project=self.project, admin=True)
+
+    self._delete_old_instances()
+
+    self.instance = self.client.instance(self.instance_id,
+                                         instance_type=self.INSTANCE_TYPE,
+                                         labels=LABELS)
+
+    if not self.instance.exists():
+      cluster = self.instance.cluster(self.cluster_id,
+                                      self.LOCATION_ID,
+                                      default_storage_type=self.STORAGE_TYPE)
+      self.instance.create(clusters=[cluster])
+    self.table = self.instance.table(self.table_id)
+
+    if not self.table.exists():
+      max_versions_rule = column_family.MaxVersionsGCRule(2)
+      column_family_id = 'cf1'
+      column_families = {column_family_id: max_versions_rule}
+      self.table.create(column_families=column_families)
+
+  def _delete_old_instances(self):
+    instances = self.client.list_instances()
+    EXISTING_INSTANCES[:] = instances
+
+    def age_in_hours(micros):
+      return (datetime.datetime.utcnow().replace(tzinfo=UTC) - (
+          _datetime_from_microseconds(micros))).total_seconds() // 3600
+    CLEAN_INSTANCE = [i for instance in EXISTING_INSTANCES for i in instance if(
+        LABEL_KEY in i.labels.keys() and
+        (age_in_hours(int(i.labels[LABEL_KEY])) >= 2))]
+
+    if CLEAN_INSTANCE:
+      for instance in CLEAN_INSTANCE:
+        instance.delete()
+
+  def tearDown(self):
+    if self.instance.exists():
+      self.instance.delete()
+
+  def test_bigtable_write(self):
+    number = self.number
+    pipeline_args = self.test_pipeline.options_list
+    pipeline_options = PipelineOptions(pipeline_args)
+
+    with beam.Pipeline(options=pipeline_options) as pipeline:
+      config_data = {'project_id':self.project,
+                     'instance_id':self.instance,
+                     'table_id':self.table}
+      _ = (
+          pipeline
+          | 'Generate Direct Rows' >> GenerateTestRows(number, **config_data))
+
+      result = pipeline.run()
+      result.wait_until_finish()
+
+      assert result.state == PipelineState.DONE
+
+      read_rows = self.table.read_rows()
+      assert len([_ for _ in read_rows]) == number
+
+      if not hasattr(result, 'has_job') or result.has_job:
+        read_filter = MetricsFilter().with_name('Written Row')
+        query_result = result.metrics().query(read_filter)
+        if query_result['counters']:
+          read_counter = query_result['counters'][0]
+
+          logging.info('Number of Rows: %d', read_counter.committed)
+          assert read_counter.committed == number
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py
new file mode 100644
index 0000000..ccb10c5
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the BigTableWriteFn to insert
+those generated rows in the table.
+
+  main_table = (p
+                | beam.Create(self._generate())
+                | WriteToBigTable(project_id,
+                                  instance_id,
+                                  table_id))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+  from google.cloud.bigtable import Client
+except ImportError:
+  pass
+
+__all__ = ['WriteToBigTable']
+
+
+class _BigTableWriteFn(beam.DoFn):
+  """ Creates the connector can call and add_row to the batcher using each
+  row in beam pipe line
+  Args:
+    project_id(str): GCP Project ID
+    instance_id(str): GCP Instance ID
+    table_id(str): GCP Table ID
+
+  """
+
+  def __init__(self, project_id, instance_id, table_id):
+    """ Constructor of the Write connector of Bigtable
+    Args:
+      project_id(str): GCP Project of to write the Rows
+      instance_id(str): GCP Instance to write the Rows
+      table_id(str): GCP Table to write the `DirectRows`
+    """
+    super(_BigTableWriteFn, self).__init__()
+    self.beam_options = {'project_id': project_id,
+                         'instance_id': instance_id,
+                         'table_id': table_id}
+    self.table = None
+    self.batcher = None
+    self.written = Metrics.counter(self.__class__, 'Written Row')
+
+  def __getstate__(self):
+    return self.beam_options
+
+  def __setstate__(self, options):
+    self.beam_options = options
+    self.table = None
+    self.batcher = None
+    self.written = Metrics.counter(self.__class__, 'Written Row')
+
+  def start_bundle(self):
+    if self.table is None:
+      client = Client(project=self.beam_options['project_id'])
+      instance = client.instance(self.beam_options['instance_id'])
+      self.table = instance.table(self.beam_options['table_id'])
+    self.batcher = self.table.mutations_batcher()
+
+  def process(self, row):
+    self.written.inc()
+    # You need to set the timestamp in the cells in this row object,
+    # when we do a retry we will mutating the same object, but, with this
+    # we are going to set our cell with new values.
+    # Example:
+    # direct_row.set_cell('cf1',
+    #                     'field1',
+    #                     'value1',
+    #                     timestamp=datetime.datetime.now())
+    self.batcher.mutate(row)
+
+  def finish_bundle(self):
+    self.batcher.flush()
+    self.batcher = None
+
+  def display_data(self):
+    return {'projectId': DisplayDataItem(self.beam_options['project_id'],
+                                         label='Bigtable Project Id'),
+            'instanceId': DisplayDataItem(self.beam_options['instance_id'],
+                                          label='Bigtable Instance Id'),
+            'tableId': DisplayDataItem(self.beam_options['table_id'],
+                                       label='Bigtable Table Id')
+           }
+
+
+class WriteToBigTable(beam.PTransform):
+  """ A transform to write to the Bigtable Table.
+
+  A PTransform that write a list of `DirectRow` into the Bigtable Table
+
+  """
+  def __init__(self, project_id=None, instance_id=None,
+               table_id=None):
+    """ The PTransform to access the Bigtable Write connector
+    Args:
+      project_id(str): GCP Project of to write the Rows
+      instance_id(str): GCP Instance to write the Rows
+      table_id(str): GCP Table to write the `DirectRows`
+    """
+    super(WriteToBigTable, self).__init__()
+    self.beam_options = {'project_id': project_id,
+                         'instance_id': instance_id,
+                         'table_id': table_id}
+
+  def expand(self, pvalue):
+    beam_options = self.beam_options
+    return (pvalue
+            | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
+                                          beam_options['instance_id'],
+                                          beam_options['table_id'])))
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index 05ed780..1f8c41b 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -50,6 +50,8 @@ googledatastore==7.0.1
 google-cloud-pubsub==0.39.0
 google-cloud-bigquery==1.6.0
 proto-google-cloud-datastore-v1==0.90.4
+google-cloud-bigtable==0.31.1
+google-cloud-core==0.28.1
 
 # Optional packages
 cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index d2f592f..7078646 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -142,6 +142,8 @@ GCP_REQUIREMENTS = [
     'google-cloud-pubsub==0.39.0',
     # GCP packages required by tests
     'google-cloud-bigquery>=1.6.0,<1.7.0',
+    'google-cloud-core==0.28.1',
+    'google-cloud-bigtable==0.31.1',
 ]
 
 
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index e46bcde..040b71f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -58,13 +58,16 @@ setenv =
   BEAM_EXPERIMENTAL_PY3=1
   RUN_SKIPPED_PY3_TESTS=0
 modules =
-  apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
+  apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
 commands =
   python --version
   pip --version
   {toxinidir}/scripts/run_tox_cleanup.sh
   python setup.py nosetests --tests {[testenv:py3]modules}
   {toxinidir}/scripts/run_tox_cleanup.sh
+deps =
+  google-cloud-core==0.28.1
+  google-cloud-bigtable==0.31.1
 
 [testenv:py27-cython]
 # cython tests are only expected to work in linux (2.x and 3.x)