You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/04/29 09:40:17 UTC

[beam] branch BEAM-9723-java-dlp created (now a6c0dc3)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a change to branch BEAM-9723-java-dlp
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at a6c0dc3  [BEAM-9723] Add DLP integration transforms

This branch includes the following new commits:

     new a6c0dc3  [BEAM-9723] Add DLP integration transforms

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: [BEAM-9723] Add DLP integration transforms

Posted by mw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch BEAM-9723-java-dlp
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a6c0dc3024f1defc1f659b2371f516925bc9d7f2
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Apr 27 10:04:12 2020 +0200

    [BEAM-9723] Add DLP integration transforms
---
 sdks/java/extensions/ml/build.gradle               |   5 +-
 .../beam/sdk/extensions/ml/BatchRequestForDLP.java | 119 +++++++++++++
 .../beam/sdk/extensions/ml/DLPDeidentifyText.java  | 186 +++++++++++++++++++++
 .../beam/sdk/extensions/ml/DLPInspectText.java     | 147 ++++++++++++++++
 .../beam/sdk/extensions/ml/DLPReidentifyText.java  | 179 ++++++++++++++++++++
 .../sdk/extensions/ml/DLPTextOperationsIT.java     | 121 ++++++++++++++
 6 files changed, 756 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
index 274c074..6f9b567 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -26,10 +26,13 @@ description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
 dependencies {
     compile project(path: ":sdks:java:core", configuration: "shadow")
     compile project(":sdks:java:expansion-service")
-    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    compile 'com.google.cloud:google-cloud-dlp:1.1.1'
+    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     testCompile library.java.mockito_core
     testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile 'com.google.cloud:google-cloud-dlp:1.1.1'
+    testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
     testCompile library.java.junit
     testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
     testRuntimeOnly project(":runners:google-cloud-dataflow-java")
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
new file mode 100644
index 0000000..9a2d4d2
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+class BatchRequestForDLP extends DoFn<KV<String, String>, KV<String, String>> {
+  private final Integer batchSize;
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  public BatchRequestForDLP(Integer batchSize) {
+    if (batchSize > DLP_PAYLOAD_LIMIT) {
+      throw new IllegalArgumentException(
+          "DLP batch size exceeds payload limit.\n"
+              + "Batch size should be smaller than "
+              + DLP_PAYLOAD_LIMIT);
+    }
+    this.batchSize = batchSize;
+  }
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, String>>> elementsBag = StateSpecs.bag();
+
+  @StateId("elementsSize")
+  private final StateSpec<ValueState<Integer>> elementsSize = StateSpecs.value();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, String> element,
+      @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+      @StateId("elementsSize") ValueState<Integer> elementsSize,
+      @Timestamp Instant elementTs,
+      @TimerId("eventTimer") Timer eventTimer,
+      OutputReceiver<KV<String, String>> output) {
+    eventTimer.set(elementTs);
+    Integer currentElementSize =
+        (element.getValue() == null) ? 0 : element.getValue().getBytes(UTF_8).length;
+    Integer currentBufferSize = (elementsSize.read() == null) ? 0 : elementsSize.read();
+    boolean clearBuffer = (currentElementSize + currentBufferSize) > batchSize;
+    if (clearBuffer) {
+      KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+      output.output(inspectBufferedData);
+      DLPInspectText.LOG.info(
+          "****CLEAR BUFFER Key {} **** Current Content Size {}",
+          inspectBufferedData.getKey(),
+          inspectBufferedData.getValue().getBytes(UTF_8).length);
+      clearState(elementsBag, elementsSize);
+    } else {
+      elementsBag.add(element);
+      elementsSize.write(currentElementSize + currentBufferSize);
+    }
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+      @StateId("elementsSize") ValueState<Integer> elementsSize,
+      OutputReceiver<KV<String, String>> output) {
+    // Process left over records less than  batch size
+    KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+    output.output(inspectBufferedData);
+    DLPInspectText.LOG.info(
+        "****Timer Triggered Key {} **** Current Content Size {}",
+        inspectBufferedData.getKey(),
+        inspectBufferedData.getValue().getBytes(UTF_8).length);
+    clearState(elementsBag, elementsSize);
+  }
+
+  private static KV<String, String> emitResult(Iterable<KV<String, String>> bufferData) {
+    StringBuilder builder = new StringBuilder();
+    String fileName =
+        (bufferData.iterator().hasNext()) ? bufferData.iterator().next().getKey() : "UNKNOWN_FILE";
+    bufferData.forEach(
+        e -> {
+          builder.append(e.getValue());
+        });
+    return KV.of(fileName, builder.toString());
+  }
+
+  private static void clearState(
+      BagState<KV<String, String>> elementsBag, ValueState<Integer> elementsSize) {
+    elementsBag.clear();
+    elementsSize.clear();
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
new file mode 100644
index 0000000..b7e037a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    public abstract DLPDeidentifyText build();
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform batches the contents of input PCollection and then calls Cloud DLP service to
+   * perform the deidentification.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new DeidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    deidentifyTemplateName(),
+                    inspectConfig(),
+                    deidentifyConfig())));
+  }
+
+  static class DeidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String deidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig deidentifyConfig;
+    private transient DeidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }
+      if (deidentifyConfig != null) {
+        requestBuilder.setDeidentifyConfig(deidentifyConfig);
+      }
+      if (deidentifyTemplateName != null) {
+        requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName);
+      }
+      if (deidentifyConfig == null && deidentifyTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either deidentifyConfig or deidentifyTemplateName need to be set!");
+      }
+    }
+
+    public DeidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String deidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig deidentifyConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.deidentifyTemplateName = deidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.deidentifyConfig = deidentifyConfig;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            DeidentifyContentResponse response =
+                dlpServiceClient.deidentifyContent(this.requestBuilder.build());
+            response.getItem().getValue();
+            c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
new file mode 100644
index 0000000..e63bfe3
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentRequest;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPInspectText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<List<Finding>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract DLPInspectText build();
+  }
+
+  public static Builder newBuilder() {
+    return new AutoValue_DLPInspectText.Builder();
+  }
+
+  @Override
+  public PCollection<List<Finding>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPInspect",
+            ParDo.of(new InspectData(projectId(), inspectTemplateName(), inspectConfig())));
+  }
+
+  public static class InspectData extends DoFn<KV<String, String>, List<Finding>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final InspectConfig inspectConfig;
+    private transient InspectContentRequest.Builder requestBuilder;
+    private final Counter numberOfBytesInspected =
+        Metrics.counter(InspectData.class, "NumberOfBytesInspected");
+
+    public InspectData(String projectId, String inspectTemplateName, InspectConfig inspectConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.inspectConfig = inspectConfig;
+    }
+
+    @Setup
+    public void setup() {
+      this.requestBuilder =
+          InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(this.inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectTemplateName == null && inspectConfig == null) {
+        throw new IllegalArgumentException("");
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            InspectContentResponse response =
+                dlpServiceClient.inspectContent(this.requestBuilder.build());
+            List<Finding> findingsList = response.getResult().getFindingsList();
+            c.output(findingsList);
+            numberOfBytesInspected.inc(contentItem.getSerializedSize());
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
new file mode 100644
index 0000000..e841d9a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String reidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig reidentifyConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract DLPReidentifyText build();
+  }
+
+  public static DLPReidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPReidentifyText.Builder();
+  }
+
+  @Override
+  public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new ReidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    reidentifyTemplateName(),
+                    inspectConfig(),
+                    reidentifyConfig())));
+  }
+
+  public static class ReidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String reidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig reidentifyConfig;
+    private transient ReidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }
+      if (reidentifyConfig != null) {
+        requestBuilder.setReidentifyConfig(reidentifyConfig);
+      }
+      if (reidentifyTemplateName != null) {
+        requestBuilder.setReidentifyTemplateName(reidentifyTemplateName);
+      }
+      if (reidentifyConfig == null && reidentifyTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either reidentifyConfig or reidentifyTemplateName need to be set!");
+      }
+    }
+
+    public ReidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String reidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig reidentifyConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.reidentifyTemplateName = reidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.reidentifyConfig = reidentifyConfig;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            ReidentifyContentResponse response =
+                dlpServiceClient.reidentifyContent(this.requestBuilder.build());
+            response.getItem().getValue();
+            c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
new file mode 100644
index 0000000..5401105
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+  private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();;
+  private static InspectConfig inspectConfig =
+      InspectConfig.newBuilder()
+          .addInfoTypes(emailAddress)
+          .setMinLikelihood(Likelihood.LIKELY)
+          .build();
+
+  @Test
+  public void inspectsText() {
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+    PCollection<List<Finding>> inspectionResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPInspectText.newBuilder()
+                    .setBatchSize(52400)
+                    .setProjectId(projectId)
+                    .setInspectConfig(inspectConfig)
+                    .build());
+    PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void deidentifiesText() {
+    emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+    PCollection<KV<String, String>> deidentificationResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPDeidentifyText.newBuilder()
+                    .setBatchSize(52400)
+                    .setProjectId(projectId)
+                    .setInspectConfig(inspectConfig)
+                    .setDeidentifyConfig(getDeidentifyConfig())
+                    .build());
+    PAssert.that(deidentificationResult).containsInAnyOrder(KV.of("", "####################"));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private DeidentifyConfig getDeidentifyConfig() {
+    CharacterMaskConfig characterMaskConfig =
+        CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build();
+    PrimitiveTransformation primitiveTransformation =
+        PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build();
+    InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation =
+        InfoTypeTransformations.InfoTypeTransformation.newBuilder()
+            .addInfoTypes(emailAddress)
+            .setPrimitiveTransformation(primitiveTransformation)
+            .build();
+    return DeidentifyConfig.newBuilder()
+        .setInfoTypeTransformations(
+            InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build())
+        .build();
+  }
+
+  private static class VerifyInspectionResult
+      implements SerializableFunction<Iterable<List<Finding>>, Void> {
+    @Override
+    public Void apply(Iterable<List<Finding>> input) {
+      List<Boolean> matches = new ArrayList<>();
+      input.forEach(
+          resultList ->
+              matches.add(
+                  resultList.stream()
+                      .anyMatch(finding -> finding.getInfoType().equals(emailAddress))));
+      assertEquals(Boolean.TRUE, matches.contains(Boolean.TRUE));
+      return null;
+    }
+  }
+}