You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2019/01/30 18:37:21 UTC
[beam] branch master updated: [BEAM-3342] Create a Cloud Bigtable
Python connector Write
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 59499f6 [BEAM-3342] Create a Cloud Bigtable Python connector Write
new 054d42c Merge pull request #7367: [BEAM-3342] Create a Cloud Bigtable Python connector Write
59499f6 is described below
commit 59499f682204f51e1a5654973ac9e6084ab6b100
Author: Juan Rael <ju...@qlogic.io>
AuthorDate: Tue Jan 29 10:16:44 2019 -0500
[BEAM-3342] Create a Cloud Bigtable Python connector Write
---
.../jenkins/dependency_check/generate_report.sh | 2 +
.../job_beam_PerformanceTests_Analysis.groovy | 2 +-
ownership/PYTHON_DEPENDENCY_OWNERS.yaml | 6 +
.../examples/cookbook/bigtableio_it_test.py | 194 +++++++++++++++++++++
sdks/python/apache_beam/io/gcp/bigtableio.py | 143 +++++++++++++++
sdks/python/container/base_image_requirements.txt | 2 +
sdks/python/setup.py | 2 +
sdks/python/tox.ini | 5 +-
8 files changed, 354 insertions(+), 2 deletions(-)
diff --git a/.test-infra/jenkins/dependency_check/generate_report.sh b/.test-infra/jenkins/dependency_check/generate_report.sh
index 13c6a0f..734aa44 100755
--- a/.test-infra/jenkins/dependency_check/generate_report.sh
+++ b/.test-infra/jenkins/dependency_check/generate_report.sh
@@ -42,6 +42,8 @@ REPORT_DESCRIPTION="
/usr/bin/virtualenv dependency/check
. dependency/check/bin/activate
pip install --upgrade google-cloud-bigquery
+pip install --upgrade google-cloud-bigtable
+pip install --upgrade google-cloud-core
rm -f build/dependencyUpdates/beam-dependency-check-report.txt
# Insall packages and run the unit tests of the report generator and the jira manager
diff --git a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
index 59c347d..efa8353 100644
--- a/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
+++ b/.test-infra/jenkins/job_beam_PerformanceTests_Analysis.groovy
@@ -73,7 +73,7 @@ job(testConfiguration.jobName) {
shell('.env/bin/pip install --upgrade setuptools pip')
// Install job requirements for analysis script.
- shell('.env/bin/pip install requests google.cloud.bigquery mock')
+ shell('.env/bin/pip install requests google.cloud.bigquery mock google.cloud.bigtable google.cloud')
// Launch verification tests before executing script.
shell('.env/bin/python ' + commonJobProperties.checkoutDir + '/.test-infra/jenkins/verify_performance_test_results_test.py')
diff --git a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
index 7e46c6e..ba81490 100644
--- a/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
+++ b/ownership/PYTHON_DEPENDENCY_OWNERS.yaml
@@ -42,9 +42,15 @@ deps:
google-apitools:
owners:
+ google-cloud-core:
+ owners:
+
google-cloud-bigquery:
owners: markflyhigh
+ google-cloud-bigtable:
+ owners:
+
google-cloud-pubsub:
owners: markflyhigh
diff --git a/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
new file mode 100644
index 0000000..82e2869
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/bigtableio_it_test.py
@@ -0,0 +1,194 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unittest for GCP Bigtable testing."""
+from __future__ import absolute_import
+
+import datetime
+import logging
+import random
+import string
+import unittest
+import uuid
+
+import pytz
+
+import apache_beam as beam
+from apache_beam.io.gcp.bigtableio import WriteToBigTable
+from apache_beam.metrics.metric import MetricsFilter
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+
+# Protect against environments where bigtable library is not available.
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.cloud._helpers import _datetime_from_microseconds
+ from google.cloud._helpers import _microseconds_from_datetime
+ from google.cloud._helpers import UTC
+ from google.cloud.bigtable import row, column_family, Client
+except ImportError:
+ Client = None
+ UTC = pytz.utc
+ _microseconds_from_datetime = lambda label_stamp: label_stamp
+ _datetime_from_microseconds = lambda micro: micro
+
+
+EXISTING_INSTANCES = []
+LABEL_KEY = u'python-bigtable-beam'
+label_stamp = datetime.datetime.utcnow().replace(tzinfo=UTC)
+label_stamp_micros = _microseconds_from_datetime(label_stamp)
+LABELS = {LABEL_KEY: str(label_stamp_micros)}
+
+
+class GenerateTestRows(beam.PTransform):
+ """ A transform test to run write to the Bigtable Table.
+
+ A PTransform that generate a list of `DirectRow` to write it in
+ Bigtable Table.
+
+ """
+ def __init__(self, number, project_id=None, instance_id=None,
+ table_id=None):
+ super(WriteToBigTable, self).__init__()
+ self.number = number
+ self.rand = random.choice(string.ascii_letters + string.digits)
+ self.column_family_id = 'cf1'
+ self.beam_options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id}
+
+ def _generate(self):
+ value = ''.join(self.rand for i in range(100))
+
+ for index in range(self.number):
+ key = "beam_key%s" % ('{0:07}'.format(index))
+ direct_row = row.DirectRow(row_key=key)
+ for column_id in range(10):
+ direct_row.set_cell(self.column_family_id,
+ ('field%s' % column_id).encode('utf-8'),
+ value,
+ datetime.datetime.now())
+ yield direct_row
+
+ def expand(self, pvalue):
+ beam_options = self.beam_options
+ return (pvalue
+ | beam.Create(self._generate())
+ | WriteToBigTable(beam_options['project_id'],
+ beam_options['instance_id'],
+ beam_options['table_id']))
+
+
+@unittest.skipIf(Client is None, 'GCP Bigtable dependencies are not installed')
+class BigtableIOWriteTest(unittest.TestCase):
+ """ Bigtable Write Connector Test
+
+ """
+ DEFAULT_TABLE_PREFIX = "python-test"
+ instance_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+ cluster_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+ table_id = DEFAULT_TABLE_PREFIX + "-" + str(uuid.uuid4())[:8]
+ number = 500
+ LOCATION_ID = "us-east1-b"
+
+ def setUp(self):
+ try:
+ from google.cloud.bigtable import enums
+ self.STORAGE_TYPE = enums.StorageType.HDD
+ self.INSTANCE_TYPE = enums.Instance.Type.DEVELOPMENT
+ except ImportError:
+ self.STORAGE_TYPE = 2
+ self.INSTANCE_TYPE = 2
+
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ self.project = self.test_pipeline.get_option('project')
+ self.client = Client(project=self.project, admin=True)
+
+ self._delete_old_instances()
+
+ self.instance = self.client.instance(self.instance_id,
+ instance_type=self.INSTANCE_TYPE,
+ labels=LABELS)
+
+ if not self.instance.exists():
+ cluster = self.instance.cluster(self.cluster_id,
+ self.LOCATION_ID,
+ default_storage_type=self.STORAGE_TYPE)
+ self.instance.create(clusters=[cluster])
+ self.table = self.instance.table(self.table_id)
+
+ if not self.table.exists():
+ max_versions_rule = column_family.MaxVersionsGCRule(2)
+ column_family_id = 'cf1'
+ column_families = {column_family_id: max_versions_rule}
+ self.table.create(column_families=column_families)
+
+ def _delete_old_instances(self):
+ instances = self.client.list_instances()
+ EXISTING_INSTANCES[:] = instances
+
+ def age_in_hours(micros):
+ return (datetime.datetime.utcnow().replace(tzinfo=UTC) - (
+ _datetime_from_microseconds(micros))).total_seconds() // 3600
+ CLEAN_INSTANCE = [i for instance in EXISTING_INSTANCES for i in instance if(
+ LABEL_KEY in i.labels.keys() and
+ (age_in_hours(int(i.labels[LABEL_KEY])) >= 2))]
+
+ if CLEAN_INSTANCE:
+ for instance in CLEAN_INSTANCE:
+ instance.delete()
+
+ def tearDown(self):
+ if self.instance.exists():
+ self.instance.delete()
+
+ def test_bigtable_write(self):
+ number = self.number
+ pipeline_args = self.test_pipeline.options_list
+ pipeline_options = PipelineOptions(pipeline_args)
+
+ with beam.Pipeline(options=pipeline_options) as pipeline:
+ config_data = {'project_id':self.project,
+ 'instance_id':self.instance,
+ 'table_id':self.table}
+ _ = (
+ pipeline
+ | 'Generate Direct Rows' >> GenerateTestRows(number, **config_data))
+
+ result = pipeline.run()
+ result.wait_until_finish()
+
+ assert result.state == PipelineState.DONE
+
+ read_rows = self.table.read_rows()
+ assert len([_ for _ in read_rows]) == number
+
+ if not hasattr(result, 'has_job') or result.has_job:
+ read_filter = MetricsFilter().with_name('Written Row')
+ query_result = result.metrics().query(read_filter)
+ if query_result['counters']:
+ read_counter = query_result['counters'][0]
+
+ logging.info('Number of Rows: %d', read_counter.committed)
+ assert read_counter.committed == number
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/gcp/bigtableio.py b/sdks/python/apache_beam/io/gcp/bigtableio.py
new file mode 100644
index 0000000..ccb10c5
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/bigtableio.py
@@ -0,0 +1,143 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""BigTable connector
+
+This module implements writing to BigTable tables.
+The default mode is to set row data to write to BigTable tables.
+The syntax supported is described here:
+https://cloud.google.com/bigtable/docs/quickstart-cbt
+
+BigTable connector can be used as main outputs. A main output
+(common case) is expected to be massive and will be split into
+manageable chunks and processed in parallel. In the example below
+we created a list of rows then passed to the GeneratedDirectRows
+DoFn to set the Cells and then we call the BigTableWriteFn to insert
+those generated rows in the table.
+
+ main_table = (p
+ | beam.Create(self._generate())
+ | WriteToBigTable(project_id,
+ instance_id,
+ table_id))
+"""
+from __future__ import absolute_import
+
+import apache_beam as beam
+from apache_beam.metrics import Metrics
+from apache_beam.transforms.display import DisplayDataItem
+
+try:
+ from google.cloud.bigtable import Client
+except ImportError:
+ pass
+
+__all__ = ['WriteToBigTable']
+
+
+class _BigTableWriteFn(beam.DoFn):
+ """ Creates the connector can call and add_row to the batcher using each
+ row in beam pipe line
+ Args:
+ project_id(str): GCP Project ID
+ instance_id(str): GCP Instance ID
+ table_id(str): GCP Table ID
+
+ """
+
+ def __init__(self, project_id, instance_id, table_id):
+ """ Constructor of the Write connector of Bigtable
+ Args:
+ project_id(str): GCP Project of to write the Rows
+ instance_id(str): GCP Instance to write the Rows
+ table_id(str): GCP Table to write the `DirectRows`
+ """
+ super(_BigTableWriteFn, self).__init__()
+ self.beam_options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id}
+ self.table = None
+ self.batcher = None
+ self.written = Metrics.counter(self.__class__, 'Written Row')
+
+ def __getstate__(self):
+ return self.beam_options
+
+ def __setstate__(self, options):
+ self.beam_options = options
+ self.table = None
+ self.batcher = None
+ self.written = Metrics.counter(self.__class__, 'Written Row')
+
+ def start_bundle(self):
+ if self.table is None:
+ client = Client(project=self.beam_options['project_id'])
+ instance = client.instance(self.beam_options['instance_id'])
+ self.table = instance.table(self.beam_options['table_id'])
+ self.batcher = self.table.mutations_batcher()
+
+ def process(self, row):
+ self.written.inc()
+ # You need to set the timestamp in the cells in this row object,
+ # when we do a retry we will mutating the same object, but, with this
+ # we are going to set our cell with new values.
+ # Example:
+ # direct_row.set_cell('cf1',
+ # 'field1',
+ # 'value1',
+ # timestamp=datetime.datetime.now())
+ self.batcher.mutate(row)
+
+ def finish_bundle(self):
+ self.batcher.flush()
+ self.batcher = None
+
+ def display_data(self):
+ return {'projectId': DisplayDataItem(self.beam_options['project_id'],
+ label='Bigtable Project Id'),
+ 'instanceId': DisplayDataItem(self.beam_options['instance_id'],
+ label='Bigtable Instance Id'),
+ 'tableId': DisplayDataItem(self.beam_options['table_id'],
+ label='Bigtable Table Id')
+ }
+
+
+class WriteToBigTable(beam.PTransform):
+ """ A transform to write to the Bigtable Table.
+
+ A PTransform that write a list of `DirectRow` into the Bigtable Table
+
+ """
+ def __init__(self, project_id=None, instance_id=None,
+ table_id=None):
+ """ The PTransform to access the Bigtable Write connector
+ Args:
+ project_id(str): GCP Project of to write the Rows
+ instance_id(str): GCP Instance to write the Rows
+ table_id(str): GCP Table to write the `DirectRows`
+ """
+ super(WriteToBigTable, self).__init__()
+ self.beam_options = {'project_id': project_id,
+ 'instance_id': instance_id,
+ 'table_id': table_id}
+
+ def expand(self, pvalue):
+ beam_options = self.beam_options
+ return (pvalue
+ | beam.ParDo(_BigTableWriteFn(beam_options['project_id'],
+ beam_options['instance_id'],
+ beam_options['table_id'])))
diff --git a/sdks/python/container/base_image_requirements.txt b/sdks/python/container/base_image_requirements.txt
index 05ed780..1f8c41b 100644
--- a/sdks/python/container/base_image_requirements.txt
+++ b/sdks/python/container/base_image_requirements.txt
@@ -50,6 +50,8 @@ googledatastore==7.0.1
google-cloud-pubsub==0.39.0
google-cloud-bigquery==1.6.0
proto-google-cloud-datastore-v1==0.90.4
+google-cloud-bigtable==0.31.1
+google-cloud-core==0.28.1
# Optional packages
cython==0.28.1
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index d2f592f..7078646 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -142,6 +142,8 @@ GCP_REQUIREMENTS = [
'google-cloud-pubsub==0.39.0',
# GCP packages required by tests
'google-cloud-bigquery>=1.6.0,<1.7.0',
+ 'google-cloud-core==0.28.1',
+ 'google-cloud-bigtable==0.31.1',
]
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index e46bcde..040b71f 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -58,13 +58,16 @@ setenv =
BEAM_EXPERIMENTAL_PY3=1
RUN_SKIPPED_PY3_TESTS=0
modules =
- apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
+ apache_beam.typehints,apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test,apache_beam.runners,apache_beam.io.hadoopfilesystem_test,apache_beam.io.hdfs_integration_test,apache_beam.io.gcp.tests.utils_test,apache_beam.io.gcp.big_query_query_to_table_it_test,apache_beam.io.gcp.bigquery_io_read_it_test,apache_beam.io.gcp.bigquery_test,apache_beam.io.gcp. [...]
commands =
python --version
pip --version
{toxinidir}/scripts/run_tox_cleanup.sh
python setup.py nosetests --tests {[testenv:py3]modules}
{toxinidir}/scripts/run_tox_cleanup.sh
+deps =
+ google-cloud-core==0.28.1
+ google-cloud-bigtable==0.31.1
[testenv:py27-cython]
# cython tests are only expected to work in linux (2.x and 3.x)