You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/04/29 09:40:18 UTC

[beam] 01/01: [BEAM-9723] Add DLP integration transforms

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch BEAM-9723-java-dlp
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a6c0dc3024f1defc1f659b2371f516925bc9d7f2
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Apr 27 10:04:12 2020 +0200

    [BEAM-9723] Add DLP integration transforms
---
 sdks/java/extensions/ml/build.gradle               |   5 +-
 .../beam/sdk/extensions/ml/BatchRequestForDLP.java | 119 +++++++++++++
 .../beam/sdk/extensions/ml/DLPDeidentifyText.java  | 186 +++++++++++++++++++++
 .../beam/sdk/extensions/ml/DLPInspectText.java     | 147 ++++++++++++++++
 .../beam/sdk/extensions/ml/DLPReidentifyText.java  | 179 ++++++++++++++++++++
 .../sdk/extensions/ml/DLPTextOperationsIT.java     | 121 ++++++++++++++
 6 files changed, 756 insertions(+), 1 deletion(-)

diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
index 274c074..6f9b567 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -26,10 +26,13 @@ description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
 dependencies {
     compile project(path: ":sdks:java:core", configuration: "shadow")
     compile project(":sdks:java:expansion-service")
-    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    compile 'com.google.cloud:google-cloud-dlp:1.1.1'
+    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     testCompile library.java.mockito_core
     testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile 'com.google.cloud:google-cloud-dlp:1.1.1'
+    testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
     testCompile library.java.junit
     testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
     testRuntimeOnly project(":runners:google-cloud-dataflow-java")
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
new file mode 100644
index 0000000..9a2d4d2
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+class BatchRequestForDLP extends DoFn<KV<String, String>, KV<String, String>> {
+  private final Integer batchSize;
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  public BatchRequestForDLP(Integer batchSize) {
+    if (batchSize > DLP_PAYLOAD_LIMIT) {
+      throw new IllegalArgumentException(
+          "DLP batch size exceeds payload limit.\n"
+              + "Batch size should be smaller than "
+              + DLP_PAYLOAD_LIMIT);
+    }
+    this.batchSize = batchSize;
+  }
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, String>>> elementsBag = StateSpecs.bag();
+
+  @StateId("elementsSize")
+  private final StateSpec<ValueState<Integer>> elementsSize = StateSpecs.value();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, String> element,
+      @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+      @StateId("elementsSize") ValueState<Integer> elementsSize,
+      @Timestamp Instant elementTs,
+      @TimerId("eventTimer") Timer eventTimer,
+      OutputReceiver<KV<String, String>> output) {
+    eventTimer.set(elementTs);
+    Integer currentElementSize =
+        (element.getValue() == null) ? 0 : element.getValue().getBytes(UTF_8).length;
+    Integer currentBufferSize = (elementsSize.read() == null) ? 0 : elementsSize.read();
+    boolean clearBuffer = (currentElementSize + currentBufferSize) > batchSize;
+    if (clearBuffer) {
+      KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+      output.output(inspectBufferedData);
+      DLPInspectText.LOG.info(
+          "****CLEAR BUFFER Key {} **** Current Content Size {}",
+          inspectBufferedData.getKey(),
+          inspectBufferedData.getValue().getBytes(UTF_8).length);
+      clearState(elementsBag, elementsSize);
+    } else {
+      elementsBag.add(element);
+      elementsSize.write(currentElementSize + currentBufferSize);
+    }
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+      @StateId("elementsSize") ValueState<Integer> elementsSize,
+      OutputReceiver<KV<String, String>> output) {
+    // Process left over records less than  batch size
+    KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+    output.output(inspectBufferedData);
+    DLPInspectText.LOG.info(
+        "****Timer Triggered Key {} **** Current Content Size {}",
+        inspectBufferedData.getKey(),
+        inspectBufferedData.getValue().getBytes(UTF_8).length);
+    clearState(elementsBag, elementsSize);
+  }
+
+  private static KV<String, String> emitResult(Iterable<KV<String, String>> bufferData) {
+    StringBuilder builder = new StringBuilder();
+    String fileName =
+        (bufferData.iterator().hasNext()) ? bufferData.iterator().next().getKey() : "UNKNOWN_FILE";
+    bufferData.forEach(
+        e -> {
+          builder.append(e.getValue());
+        });
+    return KV.of(fileName, builder.toString());
+  }
+
+  private static void clearState(
+      BagState<KV<String, String>> elementsBag, ValueState<Integer> elementsSize) {
+    elementsBag.clear();
+    elementsSize.clear();
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
new file mode 100644
index 0000000..b7e037a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    public abstract DLPDeidentifyText build();
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform batches the contents of input PCollection and then calls Cloud DLP service to
+   * perform the deidentification.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new DeidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    deidentifyTemplateName(),
+                    inspectConfig(),
+                    deidentifyConfig())));
+  }
+
+  static class DeidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String deidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig deidentifyConfig;
+    private transient DeidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }
+      if (deidentifyConfig != null) {
+        requestBuilder.setDeidentifyConfig(deidentifyConfig);
+      }
+      if (deidentifyTemplateName != null) {
+        requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName);
+      }
+      if (deidentifyConfig == null && deidentifyTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either deidentifyConfig or deidentifyTemplateName need to be set!");
+      }
+    }
+
+    public DeidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String deidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig deidentifyConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.deidentifyTemplateName = deidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.deidentifyConfig = deidentifyConfig;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            DeidentifyContentResponse response =
+                dlpServiceClient.deidentifyContent(this.requestBuilder.build());
+            response.getItem().getValue();
+            c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
new file mode 100644
index 0000000..e63bfe3
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentRequest;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPInspectText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<List<Finding>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract DLPInspectText build();
+  }
+
+  public static Builder newBuilder() {
+    return new AutoValue_DLPInspectText.Builder();
+  }
+
+  @Override
+  public PCollection<List<Finding>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPInspect",
+            ParDo.of(new InspectData(projectId(), inspectTemplateName(), inspectConfig())));
+  }
+
+  public static class InspectData extends DoFn<KV<String, String>, List<Finding>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final InspectConfig inspectConfig;
+    private transient InspectContentRequest.Builder requestBuilder;
+    private final Counter numberOfBytesInspected =
+        Metrics.counter(InspectData.class, "NumberOfBytesInspected");
+
+    public InspectData(String projectId, String inspectTemplateName, InspectConfig inspectConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.inspectConfig = inspectConfig;
+    }
+
+    @Setup
+    public void setup() {
+      this.requestBuilder =
+          InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(this.inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectTemplateName == null && inspectConfig == null) {
+        throw new IllegalArgumentException("");
+      }
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            InspectContentResponse response =
+                dlpServiceClient.inspectContent(this.requestBuilder.build());
+            List<Finding> findingsList = response.getResult().getFindingsList();
+            c.output(findingsList);
+            numberOfBytesInspected.inc(contentItem.getSerializedSize());
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
new file mode 100644
index 0000000..e841d9a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+    extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String reidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig reidentifyConfig();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract DLPReidentifyText build();
+  }
+
+  public static DLPReidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPReidentifyText.Builder();
+  }
+
+  @Override
+  public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new ReidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    reidentifyTemplateName(),
+                    inspectConfig(),
+                    reidentifyConfig())));
+  }
+
+  public static class ReidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String reidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig reidentifyConfig;
+    private transient ReidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }
+      if (reidentifyConfig != null) {
+        requestBuilder.setReidentifyConfig(reidentifyConfig);
+      }
+      if (reidentifyTemplateName != null) {
+        requestBuilder.setReidentifyTemplateName(reidentifyTemplateName);
+      }
+      if (reidentifyConfig == null && reidentifyTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either reidentifyConfig or reidentifyTemplateName need to be set!");
+      }
+    }
+
+    public ReidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String reidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig reidentifyConfig) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.reidentifyTemplateName = reidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.reidentifyConfig = reidentifyConfig;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+        if (!c.element().getValue().isEmpty()) {
+          ContentItem contentItem =
+              ContentItem.newBuilder().setValue(c.element().getValue()).build();
+          this.requestBuilder.setItem(contentItem);
+          if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+            String errorMessage =
+                String.format(
+                    "Payload Size %s Exceeded Batch Size %s",
+                    this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+            LOG.error(errorMessage);
+          } else {
+            ReidentifyContentResponse response =
+                dlpServiceClient.reidentifyContent(this.requestBuilder.build());
+            response.getItem().getValue();
+            c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
new file mode 100644
index 0000000..5401105
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+  private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();;
+  private static InspectConfig inspectConfig =
+      InspectConfig.newBuilder()
+          .addInfoTypes(emailAddress)
+          .setMinLikelihood(Likelihood.LIKELY)
+          .build();
+
+  @Test
+  public void inspectsText() {
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+    PCollection<List<Finding>> inspectionResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPInspectText.newBuilder()
+                    .setBatchSize(52400)
+                    .setProjectId(projectId)
+                    .setInspectConfig(inspectConfig)
+                    .build());
+    PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void deidentifiesText() {
+    emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+    PCollection<KV<String, String>> deidentificationResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPDeidentifyText.newBuilder()
+                    .setBatchSize(52400)
+                    .setProjectId(projectId)
+                    .setInspectConfig(inspectConfig)
+                    .setDeidentifyConfig(getDeidentifyConfig())
+                    .build());
+    PAssert.that(deidentificationResult).containsInAnyOrder(KV.of("", "####################"));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private DeidentifyConfig getDeidentifyConfig() {
+    CharacterMaskConfig characterMaskConfig =
+        CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build();
+    PrimitiveTransformation primitiveTransformation =
+        PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build();
+    InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation =
+        InfoTypeTransformations.InfoTypeTransformation.newBuilder()
+            .addInfoTypes(emailAddress)
+            .setPrimitiveTransformation(primitiveTransformation)
+            .build();
+    return DeidentifyConfig.newBuilder()
+        .setInfoTypeTransformations(
+            InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build())
+        .build();
+  }
+
+  private static class VerifyInspectionResult
+      implements SerializableFunction<Iterable<List<Finding>>, Void> {
+    @Override
+    public Void apply(Iterable<List<Finding>> input) {
+      List<Boolean> matches = new ArrayList<>();
+      input.forEach(
+          resultList ->
+              matches.add(
+                  resultList.stream()
+                      .anyMatch(finding -> finding.getInfoType().equals(emailAddress))));
+      assertEquals(Boolean.TRUE, matches.contains(Boolean.TRUE));
+      return null;
+    }
+  }
+}