You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/06/02 06:56:44 UTC

[beam] branch master updated: [BEAM-9723] Add DLP integration transforms (#11566)

This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b6ca2ab  [BEAM-9723] Add DLP integration transforms (#11566)
b6ca2ab is described below

commit b6ca2aba5a0141eed5bed29a9948e2c65874254f
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Tue Jun 2 08:56:14 2020 +0200

    [BEAM-9723] Add DLP integration transforms (#11566)
    
    [BEAM-9723] Add Google Cloud DLP integration transforms.
    
    * DLPDeidentifyText
    * DLPReidentifyText
    * DLPInspectText
    
    are now available.
---
 CHANGES.md                                         |   4 +
 sdks/java/extensions/ml/build.gradle               |  15 +-
 .../beam/sdk/extensions/ml/BatchRequestForDLP.java | 113 ++++++++
 .../beam/sdk/extensions/ml/DLPDeidentifyText.java  | 282 ++++++++++++++++++++
 .../beam/sdk/extensions/ml/DLPInspectText.java     | 240 +++++++++++++++++
 .../beam/sdk/extensions/ml/DLPReidentifyText.java  | 286 +++++++++++++++++++++
 .../beam/sdk/extensions/ml/MapStringToDlpRow.java  |  59 +++++
 .../sdk/extensions/ml/BatchRequestForDlpTest.java  |  63 +++++
 .../sdk/extensions/ml/DLPDeidentifyTextTest.java   | 101 ++++++++
 .../beam/sdk/extensions/ml/DLPInspectTextTest.java | 101 ++++++++
 .../sdk/extensions/ml/DLPReidentifyTextTest.java   | 101 ++++++++
 .../sdk/extensions/ml/DLPTextOperationsIT.java     | 159 ++++++++++++
 .../sdk/extensions/ml/MapStringToDlpRowTest.java   |  69 +++++
 13 files changed, 1591 insertions(+), 2 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 0168d04..d00b9c7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -93,6 +93,10 @@
 * `--direct_num_workers=0` is supported for FnApi runner. It will set the number of threads/subprocesses to number of cores of the machine executing the pipeline ([BEAM-9443](https://issues.apache.org/jira/browse/BEAM-9443)).
 * Python SDK now has experimental support for SqlTransform ([BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603)).
 * Add OnWindowExpiration method to Stateful DoFn ([BEAM-1589](https://issues.apache.org/jira/browse/BEAM-1589)).
+* Added PTransforms for Google Cloud DLP (Data Loss Prevention) services integration ([BEAM-9723](https://issues.apache.org/jira/browse/BEAM-9723)):
+    * Inspection of data,
+    * Deidentification of data,
+    * Reidentification of data.
 * Add a more complete I/O support matrix in the documentation site ([BEAM-9916](https://issues.apache.org/jira/browse/BEAM-9916)).
 * Upgrade Sphinx to 3.0.3 for building PyDoc.
 
diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
index 1f61c6a..c5e1ed3 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -1,3 +1,5 @@
+import groovy.json.JsonOutput
+
 /*
  *
  *  * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +21,7 @@
  */
 
 plugins { id 'org.apache.beam.module' }
-applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.ml')
 
 description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
 
@@ -27,16 +29,25 @@ dependencies {
     compile project(path: ":sdks:java:core", configuration: "shadow")
     compile project(":sdks:java:expansion-service")
     compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    compile 'com.google.cloud:google-cloud-dlp:1.1.4'
     compile 'com.google.cloud:google-cloud-language:1.99.4'
+    provided library.java.junit
+    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     testCompile library.java.mockito_core
     testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile 'com.google.cloud:google-cloud-dlp:1.1.4'
+    testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
     testCompile 'com.google.cloud:google-cloud-language:1.99.4'
-    testCompile library.java.junit
     testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
     testRuntimeOnly project(":runners:google-cloud-dataflow-java")
 }
 
 project.test {
+    def gcpProject = project.findProperty("gcpProject") ?: 'apache-beam-testing'
     include "**/**IT.class"
+    def pipelineOptions = [
+            "--project=${gcpProject}"
+    ]
+    systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
 }
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
new file mode 100644
index 0000000..aabac4e
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Batches input rows to reduce number of requests sent to Cloud DLP service. */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+
+  private final Integer batchSizeBytes;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  /**
+   * Constructs the batching DoFn.
+   *
+   * @param batchSize Desired batch size in bytes.
+   */
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSizeBytes = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  /**
+   * Outputs the elements buffered in the elementsBag in batches of desired size.
+   *
+   * @param elementsBag element buffer.
+   * @param output Batched input elements.
+   */
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    if (elementsBag.read().iterator().hasNext()) {
+      String key = elementsBag.read().iterator().next().getKey();
+      AtomicInteger bufferSize = new AtomicInteger();
+      List<Table.Row> rows = new ArrayList<>();
+      elementsBag
+          .read()
+          .forEach(
+              element -> {
+                int elementSize = element.getValue().getSerializedSize();
+                boolean clearBuffer = bufferSize.intValue() + elementSize > batchSizeBytes;
+                if (clearBuffer) {
+                  LOG.debug(
+                      "Clear buffer of {} bytes, Key {}", bufferSize.intValue(), element.getKey());
+                  numberOfRowsBagged.inc(rows.size());
+                  output.output(KV.of(element.getKey(), rows));
+                  rows.clear();
+                  bufferSize.set(0);
+                }
+                rows.add(element.getValue());
+                bufferSize.getAndAdd(element.getValue().getSerializedSize());
+              });
+      if (!rows.isEmpty()) {
+        LOG.debug("Outputting remaining {} rows.", rows.size());
+        numberOfRowsBagged.inc(rows.size());
+        output.output(KV.of(key, rows));
+      }
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
new file mode 100644
index 0000000..0502950
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * deidentifying text according to provided settings. The transform supports both columnar delimited
+ * input data (eg. CSV) and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with table headers is added to the
+ * PTransform, delimiter also should be set, else the results will be incorrect. If headerColumns is
+ * neither set nor passed as side input, input is assumed to be unstructured.
+ *
+ * <p>Either deidentifyTemplateName (String) or deidentifyConfig {@link DeidentifyConfig} need to be
+ * set. inspectTemplateName and inspectConfig ({@link InspectConfig} are optional.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, DeidentifyContentResponse>>> {
+
+  public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+  /** @return Template name for data inspection. */
+  @Nullable
+  public abstract String getInspectTemplateName();
+
+  /** @return Template name for data deidentification. */
+  @Nullable
+  public abstract String getDeidentifyTemplateName();
+
+  /**
+   * @return Configuration object for data inspection. If present, supersedes the template settings.
+   */
+  @Nullable
+  public abstract InspectConfig getInspectConfig();
+
+  /** @return Configuration object for deidentification. If present, supersedes the template. */
+  @Nullable
+  public abstract DeidentifyConfig getDeidentifyConfig();
+
+  /** @return List of column names if the input KV value is a delimited row. */
+  @Nullable
+  public abstract PCollectionView<List<String>> getHeaderColumns();
+
+  /** @return Delimiter to be used when splitting values from input strings into columns. */
+  @Nullable
+  public abstract String getColumnDelimiter();
+
+  /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+  public abstract Integer getBatchSizeBytes();
+
+  /** @return ID of Google Cloud project to be used when deidentifying data. */
+  public abstract String getProjectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    /** @param inspectTemplateName Template name for data inspection. */
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    /** @param headerColumns List of column names if the input KV value is a delimited row. */
+    public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+    /**
+     * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+     */
+    public abstract Builder setColumnDelimiter(String delimiter);
+
+    /**
+     * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+     */
+    public abstract Builder setBatchSizeBytes(Integer batchSize);
+
+    /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+    public abstract Builder setProjectId(String projectId);
+
+    /** @param deidentifyTemplateName Template name for data deidentification. */
+    public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+    /**
+     * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+     *     template settings.
+     */
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    /**
+     * @param deidentifyConfig Configuration object for data deidentification. If present,
+     *     supersedes the template settings.
+     */
+    public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    abstract DLPDeidentifyText autoBuild();
+
+    public DLPDeidentifyText build() {
+      DLPDeidentifyText dlpDeidentifyText = autoBuild();
+      if (dlpDeidentifyText.getDeidentifyConfig() == null
+          && dlpDeidentifyText.getDeidentifyTemplateName() == null) {
+        throw new IllegalArgumentException(
+            "Either deidentifyConfig or deidentifyTemplateName need to be set!");
+      }
+      if (dlpDeidentifyText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Batch size is too large! It should be smaller or equal than %d.",
+                DLP_PAYLOAD_LIMIT_BYTES));
+      }
+      if (dlpDeidentifyText.getColumnDelimiter() == null
+          && dlpDeidentifyText.getHeaderColumns() != null) {
+        throw new IllegalArgumentException(
+            "Column delimiter should be set if headers are present.");
+      }
+      if (dlpDeidentifyText.getHeaderColumns() == null
+          && dlpDeidentifyText.getColumnDelimiter() != null) {
+        throw new IllegalArgumentException(
+            "Column headers should be supplied when delimiter is present.");
+      }
+      return dlpDeidentifyText;
+    }
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+   * Cloud DLP service to perform the deidentification according to provided settings.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, DeidentifyContentResponse>> expand(
+      PCollection<KV<String, String>> input) {
+    return input
+        .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new DeidentifyText(
+                    getProjectId(),
+                    getInspectTemplateName(),
+                    getDeidentifyTemplateName(),
+                    getInspectConfig(),
+                    getDeidentifyConfig(),
+                    getHeaderColumns())));
+  }
+
+  /** DoFn performing calls to Cloud DLP service on GCP. */
+  static class DeidentifyText
+      extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, DeidentifyContentResponse>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String deidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig deidentifyConfig;
+    private final PCollectionView<List<String>> headerColumns;
+    private transient DeidentifyContentRequest.Builder requestBuilder;
+    private transient DlpServiceClient dlpServiceClient;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (deidentifyConfig != null) {
+        requestBuilder.setDeidentifyConfig(deidentifyConfig);
+      }
+      if (deidentifyTemplateName != null) {
+        requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName);
+      }
+      dlpServiceClient = DlpServiceClient.create();
+    }
+
+    @Teardown
+    public void teardown() {
+      dlpServiceClient.close();
+    }
+
+    /**
+     * @param projectId ID of GCP project that should be used for deidentification.
+     * @param inspectTemplateName Template name for inspection. Optional.
+     * @param deidentifyTemplateName Template name for deidentification. Either this or
+     *     deidentifyConfig is required.
+     * @param inspectConfig Configuration object for inspection. Optional.
+     * @param deidentifyConfig Deidentification config containing data transformations. Either this
+     *     or deidentifyTemplateName is required.
+     * @param headerColumns Header row of the table if applicable.
+     */
+    public DeidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String deidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig deidentifyConfig,
+        PCollectionView<List<String>> headerColumns) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.deidentifyTemplateName = deidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.deidentifyConfig = deidentifyConfig;
+      this.headerColumns = headerColumns;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      String fileName = c.element().getKey();
+      List<FieldId> dlpTableHeaders;
+      if (headerColumns != null) {
+        dlpTableHeaders =
+            c.sideInput(headerColumns).stream()
+                .map(header -> FieldId.newBuilder().setName(header).build())
+                .collect(Collectors.toList());
+      } else {
+        // handle unstructured input
+        dlpTableHeaders = new ArrayList<>();
+        dlpTableHeaders.add(FieldId.newBuilder().setName("value").build());
+      }
+      Table table =
+          Table.newBuilder()
+              .addAllHeaders(dlpTableHeaders)
+              .addAllRows(c.element().getValue())
+              .build();
+      ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+      this.requestBuilder.setItem(contentItem);
+      DeidentifyContentResponse response =
+          dlpServiceClient.deidentifyContent(this.requestBuilder.build());
+      c.output(KV.of(fileName, response));
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
new file mode 100644
index 0000000..ff58674
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentRequest;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * inspecting text for identifying data according to provided settings. The transform supports both
+ * delimited columnar input data (eg. CSV) and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with table headers is added to the
+ * PTransform, delimiter also should be set, else the results will be incorrect. If headerColumns is
+ * neither set nor passed as sideinput, input is assumed to be unstructured.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * InspectContentResponse}, which will contain a list of {@link
+ * com.google.privacy.dlp.v2.InspectResult} for the user to consume.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPInspectText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, InspectContentResponse>>> {
+
+  public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+  /** @return Template name for data inspection. */
+  @Nullable
+  public abstract String getInspectTemplateName();
+
+  /**
+   * @return Configuration object for data inspection. If present, supersedes the template settings.
+   */
+  @Nullable
+  public abstract InspectConfig getInspectConfig();
+
+  /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+  public abstract Integer getBatchSizeBytes();
+
+  /** @return ID of Google Cloud project to be used when deidentifying data. */
+  public abstract String getProjectId();
+
+  /** @return Delimiter to be used when splitting values from input strings into columns. */
+  @Nullable
+  public abstract String getColumnDelimiter();
+
+  /** @return List of column names if the input KV value is a delimited row. */
+  @Nullable
+  public abstract PCollectionView<List<String>> getHeaderColumns();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    /** @param inspectTemplateName Template name for data inspection. */
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    /**
+     * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+     *     template settings.
+     */
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    /**
+     * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+     */
+    public abstract Builder setBatchSizeBytes(Integer batchSize);
+
+    /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+    public abstract Builder setProjectId(String projectId);
+
+    /**
+     * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+     */
+    public abstract Builder setColumnDelimiter(String delimiter);
+
+    /** @param headerColumns List of column names if the input KV value is a delimited row. */
+    public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+    abstract DLPInspectText autoBuild();
+
+    public DLPInspectText build() {
+      DLPInspectText inspectText = autoBuild();
+      if (inspectText.getInspectTemplateName() == null && inspectText.getInspectConfig() == null) {
+        throw new IllegalArgumentException(
+            "Either inspectTemplateName or inspectConfig must be supplied!");
+      }
+      if (inspectText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Batch size is too large! It should be smaller or equal than %d.",
+                DLP_PAYLOAD_LIMIT_BYTES));
+      }
+      if (inspectText.getColumnDelimiter() == null && inspectText.getHeaderColumns() != null) {
+        throw new IllegalArgumentException(
+            "Column delimiter should be set if headers are present.");
+      }
+      if (inspectText.getHeaderColumns() == null && inspectText.getColumnDelimiter() != null) {
+        throw new IllegalArgumentException(
+            "Column headers should be supplied when delimiter is present.");
+      }
+      return inspectText;
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new AutoValue_DLPInspectText.Builder();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+   * Cloud DLP service to perform the data inspection according to provided settings.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, InspectContentResponse>> expand(
+      PCollection<KV<String, String>> input) {
+    return input
+        .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+        .apply(
+            "DLPInspect",
+            ParDo.of(
+                new InspectData(
+                    getProjectId(),
+                    getInspectTemplateName(),
+                    getInspectConfig(),
+                    getHeaderColumns())));
+  }
+
+  /** Performs calls to Cloud DLP service on GCP to inspect input data. */
+  static class InspectData
+      extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, InspectContentResponse>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final InspectConfig inspectConfig;
+    private final PCollectionView<List<String>> headerColumns;
+    private transient DlpServiceClient dlpServiceClient;
+    private transient InspectContentRequest.Builder requestBuilder;
+
+    /**
+     * @param projectId ID of GCP project that should be used for data inspection.
+     * @param inspectTemplateName Template name for inspection.
+     * @param inspectConfig Configuration object for inspection.
+     * @param headerColumns Header row of the table if applicable.
+     */
+    public InspectData(
+        String projectId,
+        String inspectTemplateName,
+        InspectConfig inspectConfig,
+        PCollectionView<List<String>> headerColumns) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.headerColumns = headerColumns;
+    }
+
+    @Setup
+    public void setup() throws IOException {
+      this.requestBuilder =
+          InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(this.inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      dlpServiceClient = DlpServiceClient.create();
+    }
+
+    @Teardown
+    public void teardown() {
+      dlpServiceClient.close();
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws IOException {
+      List<FieldId> tableHeaders;
+      if (headerColumns != null) {
+        tableHeaders =
+            c.sideInput(headerColumns).stream()
+                .map(header -> FieldId.newBuilder().setName(header).build())
+                .collect(Collectors.toList());
+      } else {
+        tableHeaders = new ArrayList<>();
+        tableHeaders.add(FieldId.newBuilder().setName("value").build());
+      }
+      Table table =
+          Table.newBuilder().addAllHeaders(tableHeaders).addAllRows(c.element().getValue()).build();
+      ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+      this.requestBuilder.setItem(contentItem);
+      InspectContentResponse response =
+          dlpServiceClient.inspectContent(this.requestBuilder.build());
+      c.output(KV.of(c.element().getKey(), response));
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
new file mode 100644
index 0000000..5dc20cb
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * inspecting text for identifying data according to provided settings.
+ *
+ * <p>The transform supports both delimited columnar input data and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with headers is added to the PTransform,
+ * delimiter also should be set, else the results will be incorrect. If headerColumns is neither set
+ * nor passed as sideinput, input is assumed to be unstructured.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * ReidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>Either reidentifyTemplateName {@link String} or reidentifyConfig {@link DeidentifyConfig} need
+ * to be set. inspectConfig {@link InspectConfig} and inspectTemplateName {@link String} are
+ * optional.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, ReidentifyContentResponse>>> {
+
+  public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+  /** @return Template name for data inspection. */
+  @Nullable
+  public abstract String getInspectTemplateName();
+
+  /** @return Template name for data reidentification. */
+  @Nullable
+  public abstract String getReidentifyTemplateName();
+
+  /**
+   * @return Configuration object for data inspection. If present, supersedes the template settings.
+   */
+  @Nullable
+  public abstract InspectConfig getInspectConfig();
+
+  /** @return Configuration object for reidentification. If present, supersedes the template. */
+  @Nullable
+  public abstract DeidentifyConfig getReidentifyConfig();
+
+  /** @return Delimiter to be used when splitting values from input strings into columns. */
+  @Nullable
+  public abstract String getColumnDelimiter();
+
+  /** @return List of column names if the input KV value is a delimited row. */
+  @Nullable
+  public abstract PCollectionView<List<String>> getHeaderColumns();
+
+  /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+  public abstract Integer getBatchSizeBytes();
+
+  /** @return ID of Google Cloud project to be used when deidentifying data. */
+  public abstract String getProjectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    /** @param inspectTemplateName Template name for data inspection. */
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    /**
+     * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+     *     template settings.
+     */
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    /**
+     * @param reidentifyConfig Configuration object for data deidentification. If present,
+     *     supersedes the template settings.
+     */
+    public abstract Builder setReidentifyConfig(DeidentifyConfig reidentifyConfig);
+
+    /** @param reidentifyTemplateName Template name for data deidentification. */
+    public abstract Builder setReidentifyTemplateName(String reidentifyTemplateName);
+
+    /**
+     * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+     */
+    public abstract Builder setBatchSizeBytes(Integer batchSize);
+    /** @param headerColumns List of column names if the input KV value is a delimited row. */
+    public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+    /**
+     * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+     */
+    public abstract Builder setColumnDelimiter(String delimiter);
+
+    /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+    public abstract Builder setProjectId(String projectId);
+
+    abstract DLPReidentifyText autoBuild();
+
+    public DLPReidentifyText build() {
+      DLPReidentifyText dlpReidentifyText = autoBuild();
+      if (dlpReidentifyText.getReidentifyConfig() == null
+          && dlpReidentifyText.getReidentifyTemplateName() == null) {
+        throw new IllegalArgumentException(
+            "Either reidentifyConfig or reidentifyTemplateName need to be set!");
+      }
+      if (dlpReidentifyText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Batch size is too large! It should be smaller or equal than %d.",
+                DLP_PAYLOAD_LIMIT_BYTES));
+      }
+      if (dlpReidentifyText.getColumnDelimiter() == null
+          && dlpReidentifyText.getHeaderColumns() != null) {
+        throw new IllegalArgumentException(
+            "Column delimiter should be set if headers are present.");
+      }
+      if (dlpReidentifyText.getHeaderColumns() == null
+          && dlpReidentifyText.getColumnDelimiter() != null) {
+        throw new IllegalArgumentException(
+            "Column headers should be supplied when delimiter is present.");
+      }
+      return dlpReidentifyText;
+    }
+  }
+
+  public static DLPReidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPReidentifyText.Builder();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+   * Cloud DLP service to perform the reidentification according to provided settings.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, ReidentifyContentResponse>> expand(
+      PCollection<KV<String, String>> input) {
+    return input
+        .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+        .apply(
+            "DLPReidentify",
+            ParDo.of(
+                new ReidentifyText(
+                    getProjectId(),
+                    getInspectTemplateName(),
+                    getReidentifyTemplateName(),
+                    getInspectConfig(),
+                    getReidentifyConfig(),
+                    getHeaderColumns())));
+  }
+
+  /** Performs the calls to Cloud DLP service on GCP. */
+  static class ReidentifyText
+      extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, ReidentifyContentResponse>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String reidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig reidentifyConfig;
+    private final PCollectionView<List<String>> headerColumns;
+    private transient ReidentifyContentRequest.Builder requestBuilder;
+    private transient DlpServiceClient dlpServiceClient;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (reidentifyConfig != null) {
+        requestBuilder.setReidentifyConfig(reidentifyConfig);
+      }
+      if (reidentifyTemplateName != null) {
+        requestBuilder.setReidentifyTemplateName(reidentifyTemplateName);
+      }
+      dlpServiceClient = DlpServiceClient.create();
+    }
+
+    @Teardown
+    public void teardown() {
+      dlpServiceClient.close();
+    }
+
+    /**
+     * @param projectId ID of GCP project that should be used for deidentification.
+     * @param inspectTemplateName Template name for inspection. Optional.
+     * @param reidentifyTemplateName Template name for reidentification. Either this or
+     *     reidentifyConfig is required.
+     * @param inspectConfig Configuration object for inspection. Optional.
+     * @param reidentifyConfig Reidentification config containing data transformations. Either this
+     *     or reidentifyTemplateName is required.
+     * @param headerColumns Header row of the table if applicable.
+     */
+    public ReidentifyText(
+        String projectId,
+        String inspectTemplateName,
+        String reidentifyTemplateName,
+        InspectConfig inspectConfig,
+        DeidentifyConfig reidentifyConfig,
+        PCollectionView<List<String>> headerColumns) {
+      this.projectId = projectId;
+      this.inspectTemplateName = inspectTemplateName;
+      this.reidentifyTemplateName = reidentifyTemplateName;
+      this.inspectConfig = inspectConfig;
+      this.reidentifyConfig = reidentifyConfig;
+      this.headerColumns = headerColumns;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws IOException {
+      List<FieldId> tableHeaders;
+      if (headerColumns != null) {
+        tableHeaders =
+            context.sideInput(headerColumns).stream()
+                .map(header -> FieldId.newBuilder().setName(header).build())
+                .collect(Collectors.toList());
+      } else {
+        // handle unstructured input.
+        tableHeaders = new ArrayList<>();
+        tableHeaders.add(FieldId.newBuilder().setName("value").build());
+      }
+      Table table =
+          Table.newBuilder()
+              .addAllHeaders(tableHeaders)
+              .addAllRows(context.element().getValue())
+              .build();
+      ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+      this.requestBuilder.setItem(contentItem);
+      ReidentifyContentResponse response =
+          dlpServiceClient.reidentifyContent(requestBuilder.build());
+      context.output(KV.of(context.element().getKey(), response));
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java
new file mode 100644
index 0000000..944656c
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import com.google.privacy.dlp.v2.Value;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Maps {@link KV}s of {@link String}s into KV<{@link String}, {@link Table.Row}> for further
+ * processing in the DLP transforms.
+ *
+ * <p>If a delimiter of values isn't provided, input is assumed to be unstructured and the input KV
+ * value is saved in a single column of output {@link Table.Row}.
+ */
+class MapStringToDlpRow extends DoFn<KV<String, String>, KV<String, Table.Row>> {
+  private final String delimiter;
+
+  /**
+   * @param delimiter Delimiter of values in the delimited value row that may be in the value of
+   *     input KV.
+   */
+  public MapStringToDlpRow(String delimiter) {
+    this.delimiter = delimiter;
+  }
+
+  @ProcessElement
+  public void processElement(ProcessContext context) {
+    Table.Row.Builder rowBuilder = Table.Row.newBuilder();
+    String line = Objects.requireNonNull(context.element().getValue());
+    if (delimiter != null) {
+      List<String> values = Arrays.asList(line.split(delimiter));
+      values.forEach(
+          value -> rowBuilder.addValues(Value.newBuilder().setStringValue(value).build()));
+    } else {
+      rowBuilder.addValues(Value.newBuilder().setStringValue(line).build());
+    }
+    context.output(KV.of(context.element().getKey(), rowBuilder.build()));
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java
new file mode 100644
index 0000000..3e9a741
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BatchRequestForDlpTest {
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  @Test
+  public void batchesRequests() {
+    PCollection<KV<String, Iterable<Table.Row>>> batchedRows =
+        testPipeline
+            .apply(Create.of(KV.of("key", "value1"), KV.of("key", "value2")))
+            .apply(ParDo.of(new MapStringToDlpRow(null)))
+            .apply(ParDo.of(new BatchRequestForDLP(524000)));
+    PAssert.that(batchedRows).satisfies(new VerifyPCollectionSize());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private static class VerifyPCollectionSize
+      implements SerializableFunction<Iterable<KV<String, Iterable<Table.Row>>>, Void> {
+    @Override
+    public Void apply(Iterable<KV<String, Iterable<Table.Row>>> input) {
+      List<KV<String, Iterable<Table.Row>>> itemList = new ArrayList<>();
+      input.forEach(itemList::add);
+      assertEquals(1, itemList.size());
+      return null;
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java
new file mode 100644
index 0000000..c2e5741
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPDeidentifyTextTest {
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final Integer BATCH_SIZE_SMALL = 200;
+  private static final String DELIMITER = ";";
+  private static final String TEMPLATE_NAME = "test_template";
+  private static final String PROJECT_ID = "test_id";
+
+  @Test
+  public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+    assertThrows(
+        "Either deidentifyConfig or deidentifyTemplateName need to be set!",
+        IllegalArgumentException.class,
+        () ->
+            DLPDeidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+    PCollectionView<List<String>> header =
+        testPipeline.apply(Create.of("header")).apply(View.asList());
+    assertThrows(
+        "Column delimiter should be set if headers are present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPDeidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setDeidentifyTemplateName(TEMPLATE_NAME)
+                .setHeaderColumns(header)
+                .build());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void throwsExceptionWhenBatchSizeIsTooLarge() {
+    assertThrows(
+        String.format(
+            "Batch size is too large! It should be smaller or equal than %d.",
+            DLPDeidentifyText.DLP_PAYLOAD_LIMIT_BYTES),
+        IllegalArgumentException.class,
+        () ->
+            DLPDeidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(Integer.MAX_VALUE)
+                .setDeidentifyTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+    assertThrows(
+        "Column headers should be supplied when delimiter is present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPDeidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setDeidentifyTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java
new file mode 100644
index 0000000..46c8fa3
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPInspectTextTest {
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final Integer BATCH_SIZE_SMALL = 200;
+  private static final String DELIMITER = ";";
+  private static final String TEMPLATE_NAME = "test_template";
+  private static final String PROJECT_ID = "test_id";
+
+  @Test
+  public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+    assertThrows(
+        "Either inspectTemplateName or inspectConfig must be supplied!",
+        IllegalArgumentException.class,
+        () ->
+            DLPInspectText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+    PCollectionView<List<String>> header =
+        testPipeline.apply(Create.of("header")).apply(View.asList());
+    assertThrows(
+        "Column delimiter should be set if headers are present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPInspectText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setInspectTemplateName(TEMPLATE_NAME)
+                .setHeaderColumns(header)
+                .build());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void throwsExceptionWhenBatchSizeIsTooLarge() {
+    assertThrows(
+        String.format(
+            "Batch size is too large! It should be smaller or equal than %d.",
+            DLPInspectText.DLP_PAYLOAD_LIMIT_BYTES),
+        IllegalArgumentException.class,
+        () ->
+            DLPInspectText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(Integer.MAX_VALUE)
+                .setInspectTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+    assertThrows(
+        "Column headers should be supplied when delimiter is present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPInspectText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setInspectTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java
new file mode 100644
index 0000000..9fc0f0a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPReidentifyTextTest {
+
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final Integer BATCH_SIZE_SMALL = 200;
+  private static final String DELIMITER = ";";
+  private static final String TEMPLATE_NAME = "test_template";
+  private static final String PROJECT_ID = "test_id";
+
+  @Test
+  public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+    assertThrows(
+        "Either reidentifyConfig or reidentifyTemplateName need to be set!",
+        IllegalArgumentException.class,
+        () ->
+            DLPReidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+    PCollectionView<List<String>> header =
+        testPipeline.apply(Create.of("header")).apply(View.asList());
+    assertThrows(
+        "Column delimiter should be set if headers are present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPReidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setReidentifyTemplateName(TEMPLATE_NAME)
+                .setHeaderColumns(header)
+                .build());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void throwsExceptionWhenBatchSizeIsTooLarge() {
+    assertThrows(
+        String.format(
+            "Batch size is too large! It should be smaller or equal than %d.",
+            DLPDeidentifyText.DLP_PAYLOAD_LIMIT_BYTES),
+        IllegalArgumentException.class,
+        () ->
+            DLPReidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(Integer.MAX_VALUE)
+                .setReidentifyTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+
+  @Test
+  public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+    assertThrows(
+        "Column headers should be supplied when delimiter is present.",
+        IllegalArgumentException.class,
+        () ->
+            DLPReidentifyText.newBuilder()
+                .setProjectId(PROJECT_ID)
+                .setBatchSizeBytes(BATCH_SIZE_SMALL)
+                .setReidentifyTemplateName(TEMPLATE_NAME)
+                .setColumnDelimiter(DELIMITER)
+                .build());
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
new file mode 100644
index 0000000..0ebbfb7
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+  private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+  private static final InspectConfig inspectConfig =
+      InspectConfig.newBuilder()
+          .addInfoTypes(emailAddress)
+          .setMinLikelihood(Likelihood.LIKELY)
+          .build();
+
+  @Test
+  public void inspectsText() {
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+    PCollection<KV<String, InspectContentResponse>> inspectionResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPInspectText.newBuilder()
+                    .setBatchSizeBytes(524000)
+                    .setProjectId(projectId)
+                    .setInspectConfig(inspectConfig)
+                    .build());
+    PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void deidentifiesText() {
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+    PCollection<KV<String, DeidentifyContentResponse>> deidentificationResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPDeidentifyText.newBuilder()
+                    .setBatchSizeBytes(524000)
+                    .setProjectId(projectId)
+                    .setDeidentifyConfig(getDeidentifyConfig())
+                    .build());
+    PAssert.that(deidentificationResult)
+        .satisfies(new VerifyDeidentificationResult("####################"));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private DeidentifyConfig getDeidentifyConfig() {
+    CharacterMaskConfig characterMaskConfig =
+        CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build();
+    PrimitiveTransformation primitiveTransformation =
+        PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build();
+    InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation =
+        InfoTypeTransformations.InfoTypeTransformation.newBuilder()
+            .addInfoTypes(emailAddress)
+            .setPrimitiveTransformation(primitiveTransformation)
+            .build();
+    return DeidentifyConfig.newBuilder()
+        .setInfoTypeTransformations(
+            InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build())
+        .build();
+  }
+
+  private static class VerifyInspectionResult
+      implements SerializableFunction<Iterable<KV<String, InspectContentResponse>>, Void> {
+    @Override
+    public Void apply(Iterable<KV<String, InspectContentResponse>> input) {
+      List<Boolean> matches = new ArrayList<>();
+      input.forEach(
+          item -> {
+            List<Finding> resultList = item.getValue().getResult().getFindingsList();
+            matches.add(
+                resultList.stream()
+                    .anyMatch(finding -> finding.getInfoType().equals(emailAddress)));
+          });
+      assertTrue(matches.contains(Boolean.TRUE));
+      return null;
+    }
+  }
+
+  private static class VerifyDeidentificationResult
+      implements SerializableFunction<Iterable<KV<String, DeidentifyContentResponse>>, Void> {
+    private final String expectedValue;
+
+    public VerifyDeidentificationResult(String expectedValue) {
+      this.expectedValue = expectedValue;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, DeidentifyContentResponse>> input) {
+      List<Boolean> matches = new ArrayList<>();
+      input.forEach(
+          item -> {
+            item.getValue()
+                .getItem()
+                .getTable()
+                .getRowsList()
+                .forEach(
+                    row ->
+                        matches.add(
+                            row.getValuesList().stream()
+                                .anyMatch(value -> value.getStringValue().equals(expectedValue))));
+            assertTrue(
+                item.getValue()
+                    .getItem()
+                    .getTable()
+                    .getHeadersList()
+                    .contains(FieldId.newBuilder().setName("value").build()));
+          });
+      assertTrue(matches.contains(Boolean.TRUE));
+      return null;
+    }
+  }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java
new file mode 100644
index 0000000..577a5dc
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import com.google.privacy.dlp.v2.Value;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class MapStringToDlpRowTest {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  @Test
+  public void mapsStringToRow() {
+    PCollection<KV<String, Table.Row>> rowCollection =
+        testPipeline
+            .apply(Create.of(KV.of("key", "value")))
+            .apply(ParDo.of(new MapStringToDlpRow(null)));
+    PAssert.that(rowCollection)
+        .containsInAnyOrder(
+            KV.of(
+                "key",
+                Table.Row.newBuilder()
+                    .addValues(Value.newBuilder().setStringValue("value").build())
+                    .build()));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void mapsDelimitedStringToRow() {
+    PCollection<KV<String, Table.Row>> rowCollection =
+        testPipeline
+            .apply(Create.of(KV.of("key", "value,secondValue")))
+            .apply(ParDo.of(new MapStringToDlpRow(",")));
+    PAssert.that(rowCollection)
+        .containsInAnyOrder(
+            KV.of(
+                "key",
+                Table.Row.newBuilder()
+                    .addValues(Value.newBuilder().setStringValue("value").build())
+                    .addValues(Value.newBuilder().setStringValue("secondValue").build())
+                    .build()));
+    testPipeline.run().waitUntilFinish();
+  }
+}