You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/02/13 10:50:03 UTC

[beam] branch BEAM-9258-cloud-dlp-python created (now 012354e)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a change to branch BEAM-9258-cloud-dlp-python
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 012354e  [BEAM-9258] Add cloud DLP transform to Python SDK

This branch includes the following new commits:

     new 012354e  [BEAM-9258] Add cloud DLP transform to Python SDK

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-9258] Add cloud DLP transform to Python SDK

Posted by mw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch BEAM-9258-cloud-dlp-python
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 012354e625b3739a850d6c06f2b9b760b78a24d2
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Feb 10 13:57:27 2020 +0100

    [BEAM-9258] Add cloud DLP transform to Python SDK
---
 sdks/python/apache_beam/ml/__init__.py           |  17 ++
 sdks/python/apache_beam/ml/gcp/__init__.py       |   0
 sdks/python/apache_beam/ml/gcp/cloud_dlp.py      | 207 +++++++++++++++++++++++
 sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py | 145 ++++++++++++++++
 sdks/python/setup.py                             |   1 +
 5 files changed, 370 insertions(+)

diff --git a/sdks/python/apache_beam/ml/__init__.py b/sdks/python/apache_beam/ml/__init__.py
new file mode 100644
index 0000000..a4f2cfb
--- /dev/null
+++ b/sdks/python/apache_beam/ml/__init__.py
@@ -0,0 +1,17 @@
+#  /*
+#   * Licensed to the Apache Software Foundation (ASF) under one
+#   * or more contributor license agreements.  See the NOTICE file
+#   * distributed with this work for additional information
+#   * regarding copyright ownership.  The ASF licenses this file
+#   * to you under the Apache License, Version 2.0 (the
+#   * "License"); you may not use this file except in compliance
+#   * with the License.  You may obtain a copy of the License at
+#   *
+#   *     http://www.apache.org/licenses/LICENSE-2.0
+#   *
+#   * Unless required by applicable law or agreed to in writing, software
+#   * distributed under the License is distributed on an "AS IS" BASIS,
+#   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   * See the License for the specific language governing permissions and
+#   * limitations under the License.
+#   */
diff --git a/sdks/python/apache_beam/ml/gcp/__init__.py b/sdks/python/apache_beam/ml/gcp/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
new file mode 100644
index 0000000..1ad2d51
--- /dev/null
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
@@ -0,0 +1,207 @@
+#  /*
+#   * Licensed to the Apache Software Foundation (ASF) under one
+#   * or more contributor license agreements.  See the NOTICE file
+#   * distributed with this work for additional information
+#   * regarding copyright ownership.  The ASF licenses this file
+#   * to you under the Apache License, Version 2.0 (the
+#   * "License"); you may not use this file except in compliance
+#   * with the License.  You may obtain a copy of the License at
+#   *
+#   *     http://www.apache.org/licenses/LICENSE-2.0
+#   *
+#   * Unless required by applicable law or agreed to in writing, software
+#   * distributed under the License is distributed on an "AS IS" BASIS,
+#   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   * See the License for the specific language governing permissions and
+#   * limitations under the License.
+#   */
+
+"""``PTransforms`` that implement Google Cloud Data Loss Prevention functionality.
+"""
+
+from __future__ import absolute_import
+
+import logging
+
+from google.cloud import dlp_v2
+
+import apache_beam as beam
+from apache_beam.utils import retry
+from apache_beam.utils.annotations import experimental
+
+__all__ = ['MaskDetectedDetails', 'InspectForDetails']
+
+_LOGGER = logging.getLogger(__name__)
+
+
+@experimental()
+class MaskDetectedDetails(beam.PTransform):
+  """Scrubs sensitive information detected in text.
+  The ``PTransform`` returns a ``PCollection`` of ``str``
+  Example usage::
+    pipeline | MaskDetectedDetails(project='example-gcp-project', deidentification_config={
+          'info_type_transformations: {
+              'transformations': [{
+                  'primitive_transformation': {
+                      'character_mask_config': {
+                          'masking_character': '#'
+                      }
+                  }
+              }]
+          }
+      }, inspection_config={'info_types': [{'name': 'EMAIL_ADDRESS'}]})
+  """
+  def __init__(
+      self,
+      project=None,
+      deidentification_template_name=None,
+      deidentification_config=None,
+      inspection_template_name=None,
+      inspection_config=None,
+      timeout=None):
+    """Initializes a :class:`MaskDetectedDetails` transform.
+    Args:
+      project (str): Required. GCP project in which the data processing is to be done
+      deidentification_template_name (str): Either this or `deidentification_config` required. Name of
+        deidentification template to be used on detected sensitive information instances in text.
+      deidentification_config (``Union[dict, google.cloud.dlp_v2.types.DeidentifyConfig]``):
+        Configuration for the de-identification of the content item.
+      inspection_template_name (str): This or `inspection_config` required. Name of inspection template to be used
+        to detect sensitive data in text.
+      inspection_config(``Union[dict, google.cloud.dlp_v2.types.InspectConfig]``):
+        Configuration for the inspector used to detect sensitive data in input text.
+      timeout (float): Optional. The amount of time, in seconds, to wait for
+        the request to complete.
+    """
+    self.config = {}
+    self.project = project
+    self.timeout = timeout
+    if project is None:
+      raise ValueError(
+          'GCP project name needs to be specified in "project" property')
+    if deidentification_template_name is not None and deidentification_config is not None:
+      raise ValueError(
+          'Both deidentification_template_name and deidentification_config were specified.'
+          ' Please specify only one of these.')
+    elif deidentification_template_name is None and deidentification_config is None:
+      raise ValueError(
+          'deidentification_template_name or deidentification_config must be specified.'
+      )
+    elif deidentification_template_name is not None:
+      self.config['deidentify_template_name'] = deidentification_template_name
+    else:
+      self.config['deidentify_config'] = deidentification_config
+
+    if inspection_template_name is not None and inspection_config is not None:
+      raise ValueError(
+          'Both inspection_template_name and inspection_template were specified.'
+          ' Please specify ony one of these.')
+    elif inspection_config is None and inspection_template_name is None:
+      raise ValueError(
+          'inspection_template_name or inspection_config must be specified')
+    elif inspection_template_name is not None:
+      self.config['inspect_template_name'] = inspection_template_name
+    elif inspection_config is not None:
+      self.config['inspect_config'] = inspection_config
+
+  def expand(self, pcoll):
+    return (
+        pcoll
+        | beam.ParDo(_DeidentifyFn(self.config, self.timeout, self.project)))
+
+
+@experimental()
+class InspectForDetails(beam.PTransform):
+  """Inspects input text for sensitive information.
+  the ``PTransform`` returns a ``PCollection`` of ``List[google.cloud.dlp_v2.proto.dlp_pb2.Finding]``
+  Example usage::
+      pipeline | InspectForDetails(project='example-gcp-project',
+                inspection_config={'info_types': [{'name': 'EMAIL_ADDRESS'}]})
+  """
+  def __init__(
+      self,
+      inspection_template_name=None,
+      inspection_config=None,
+      project=None,
+      timeout=None):
+    """Initializes a :class:`InspectForDetails` transform.
+    Args:
+      inspection_template_name (str): This or `inspection_config` required. Name of inspection template to be used
+        to detect sensitive data in text.
+      inspection_config(``Union[dict, google.cloud.dlp_v2.types.InspectConfig]``):
+        Configuration for the inspector used to detect sensitive data in input text.
+      project (str): Required. Name of GCP project in which the processing will take place.
+      timeout (float): Optional. The amount of time, in seconds, to wait for
+        the request to complete.
+    """
+    self.project = project
+    self.timeout = timeout
+    self.config = {}
+    if project is None:
+      raise ValueError(
+          'GCP project name needs to be specified in "project" property')
+    if inspection_template_name is not None and inspection_config is not None:
+      raise ValueError(
+          'Both inspection_template_name and inspection_template were specified.'
+          ' Please specify ony one of these.')
+    elif inspection_config is None and inspection_template_name is None:
+      raise ValueError(
+          'inspection_template_name or inspection_config must be specified')
+    elif inspection_template_name is not None:
+      self.config['inspect_template_name'] = inspection_template_name
+    elif inspection_config is not None:
+      self.config['inspect_config'] = inspection_config
+
+  def expand(self, pcoll):
+    return pcoll | beam.ParDo(
+        _InspectFn(self.config, self.timeout, self.project))
+
+
+class _DeidentifyFn(beam.DoFn):
+  def __init__(self, config=None, timeout=None, project=None, client=None):
+    self.config = config
+    self.timeout = timeout
+    self.client = client
+    self.project = project
+
+  def start_bundle(self):
+    if self.client is None:
+      self.client = dlp_v2.DlpServiceClient()
+
+  def process(self, element, **kwargs):
+    params = {
+        'timeout': self.timeout,
+        'retry': retry.with_exponential_backoff(
+            retry_filter=retry.
+            retry_on_server_errors_timeout_or_quota_issues_filter),
+        'parent': self.client.project_path(self.project)
+    }
+    params.update(self.config)
+    operation = self.client.deidentify_content(
+        item={"value": element}, **params)
+
+    yield operation.item.value
+
+
+class _InspectFn(beam.DoFn):
+  def __init__(self, config=None, timeout=None, project=None):
+    self.config = config
+    self.timeout = timeout
+    self.client = None
+    self.project = project
+
+  def setup(self):
+    self.client = dlp_v2.DlpServiceClient()
+
+  def process(self, element, **kwargs):
+    kwargs = {
+        'timeout': self.timeout,
+        'retry': retry.with_exponential_backoff(
+            retry_filter=retry.
+            retry_on_server_errors_timeout_or_quota_issues_filter),
+        "parent": self.client.project_path(self.project)
+    }
+    kwargs.update(self.config)
+    operation = self.client.inspect_content(item={"value": element}, **kwargs)
+    hits = [x for x in operation.result.findings]
+    yield hits
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
new file mode 100644
index 0000000..95670ce
--- /dev/null
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
@@ -0,0 +1,145 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for Google Cloud Video Intelligence API transforms."""
+
+import logging
+import unittest
+
+import mock
+
+import apache_beam as beam
+from apache_beam.ml.gcp.cloud_dlp import MaskDetectedDetails
+from apache_beam.ml.gcp.cloud_dlp import InspectForDetails
+from apache_beam.ml.gcp.cloud_dlp import _DeidentifyFn
+from apache_beam.ml.gcp.cloud_dlp import _InspectFn
+from apache_beam.metrics import Metrics
+from apache_beam.testing.test_pipeline import TestPipeline
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class TestDeidentifyText(unittest.TestCase):
+  def test_exception_raised_when_both_template_and_config_are_provided(self):
+    deidentify_config = {
+        "info_type_transformations": {
+            "transformations": [{
+                "primitive_transformation": {
+                    "character_mask_config": {
+                        "masking_character": '#'
+                    }
+                }
+            }]
+        }
+    }
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        # pylint: disable=expression-not-assigned
+        p | MaskDetectedDetails('template-name-1', deidentify_config)
+
+  def test_exception_raised_when_no_config_is_provided(self):
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        # pylint: disable=expression-not-assigned
+        p | MaskDetectedDetails()
+
+
+class TestDeidentifyFn(unittest.TestCase):
+  def test_deidentify_called(self):
+    class ClientMock(object):
+      def deidentify_content(self, *args, **kwargs):
+        called = Metrics.counter('test_deidentify_text', 'called')
+        called.inc()
+        operation = mock.Mock()
+        item = mock.Mock()
+        item.value = [None]
+        operation.item = item
+        return operation
+
+      def project_path(self, *args):
+        return 'test'
+
+    with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
+      p = TestPipeline()
+      deidentify_config = {
+          "info_type_transformations": {
+              "transformations": [{
+                  "primitive_transformation": {
+                      "character_mask_config": {
+                          "masking_character": '#'
+                      }
+                  }
+              }]
+          }
+      }
+      # pylint: disable=expression-not-assigned
+      (
+          p
+          | beam.Create(['mary.sue@example.com', 'john.doe@example.com'])
+          | beam.ParDo(_DeidentifyFn(config=deidentify_config, project='test')))
+      result = p.run()
+      result.wait_until_finish()
+    called = result.metrics().query()['counters'][0]
+    self.assertEqual(called.result, 2)
+
+
+class TestInspectText(unittest.TestCase):
+  def test_exception_raised_when_both_template_and_config_are_provided(self):
+    inspect_config = {"info_types": [{"name": "EMAIL_ADDRESS"}]}
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        # pylint: disable=expression-not-assigned
+        p | InspectForDetails('template-name-1', inspect_config)
+
+  def test_exception_raised_then_no_config_provided(self):
+    with self.assertRaises(ValueError):
+      with TestPipeline() as p:
+        #pylint: disable=expression-not-assigned
+        p | InspectForDetails()
+
+
+class TestDeidentifyFn(unittest.TestCase):
+  def test_inspect_called(self):
+    class ClientMock(object):
+      def inspect_content(self, *args, **kwargs):
+        called = Metrics.counter('test_inspect_text', 'called')
+        called.inc()
+        operation = mock.Mock()
+        operation.result = mock.Mock()
+        operation.result.findings = [None]
+        return operation
+
+      def project_path(self, *args):
+        return 'test'
+
+    with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
+      p = TestPipeline()
+      inspect_config = {"info_types": [{"name": "EMAIL_ADDRESS"}]}
+      # pylint: disable=expression-not-assigned
+      (
+          p
+          | beam.Create(['mary.sue@example.com', 'john.doe@example.com'])
+          | beam.ParDo(_InspectFn(config=inspect_config, project='test')))
+      result = p.run()
+      result.wait_until_finish()
+    called = result.metrics().query()['counters'][0]
+    self.assertEqual(called.result, 2)
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 06691cb..445f859 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -203,6 +203,7 @@ GCP_REQUIREMENTS = [
     'google-cloud-bigquery>=1.6.0,<1.18.0',
     'google-cloud-core>=0.28.1,<2',
     'google-cloud-bigtable>=0.31.1,<1.1.0',
+    'google-cloud-dlp >=0.12.0,<=0.13.0',
     # [BEAM-4543] googledatastore is not supported in Python 3.
     'proto-google-cloud-datastore-v1>=0.90.0,<=0.90.4; python_version < "3.0"',
     'google-cloud-spanner>=1.13.0,<1.14.0',