You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/06/02 06:56:44 UTC
[beam] branch master updated: [BEAM-9723] Add DLP integration
transforms (#11566)
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b6ca2ab [BEAM-9723] Add DLP integration transforms (#11566)
b6ca2ab is described below
commit b6ca2aba5a0141eed5bed29a9948e2c65874254f
Author: Michal Walenia <32...@users.noreply.github.com>
AuthorDate: Tue Jun 2 08:56:14 2020 +0200
[BEAM-9723] Add DLP integration transforms (#11566)
[BEAM-9723] Add Google Cloud DLP integration transforms.
* DLPDeidentifyText
* DLPReidentifyText
* DLPInspectText
are now available.
---
CHANGES.md | 4 +
sdks/java/extensions/ml/build.gradle | 15 +-
.../beam/sdk/extensions/ml/BatchRequestForDLP.java | 113 ++++++++
.../beam/sdk/extensions/ml/DLPDeidentifyText.java | 282 ++++++++++++++++++++
.../beam/sdk/extensions/ml/DLPInspectText.java | 240 +++++++++++++++++
.../beam/sdk/extensions/ml/DLPReidentifyText.java | 286 +++++++++++++++++++++
.../beam/sdk/extensions/ml/MapStringToDlpRow.java | 59 +++++
.../sdk/extensions/ml/BatchRequestForDlpTest.java | 63 +++++
.../sdk/extensions/ml/DLPDeidentifyTextTest.java | 101 ++++++++
.../beam/sdk/extensions/ml/DLPInspectTextTest.java | 101 ++++++++
.../sdk/extensions/ml/DLPReidentifyTextTest.java | 101 ++++++++
.../sdk/extensions/ml/DLPTextOperationsIT.java | 159 ++++++++++++
.../sdk/extensions/ml/MapStringToDlpRowTest.java | 69 +++++
13 files changed, 1591 insertions(+), 2 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 0168d04..d00b9c7 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -93,6 +93,10 @@
* `--direct_num_workers=0` is supported for FnApi runner. It will set the number of threads/subprocesses to number of cores of the machine executing the pipeline ([BEAM-9443](https://issues.apache.org/jira/browse/BEAM-9443)).
* Python SDK now has experimental support for SqlTransform ([BEAM-8603](https://issues.apache.org/jira/browse/BEAM-8603)).
* Add OnWindowExpiration method to Stateful DoFn ([BEAM-1589](https://issues.apache.org/jira/browse/BEAM-1589)).
+* Added PTransforms for Google Cloud DLP (Data Loss Prevention) services integration ([BEAM-9723](https://issues.apache.org/jira/browse/BEAM-9723)):
+ * Inspection of data,
+ * Deidentification of data,
+ * Reidentification of data.
* Add a more complete I/O support matrix in the documentation site ([BEAM-9916](https://issues.apache.org/jira/browse/BEAM-9916)).
* Upgrade Sphinx to 3.0.3 for building PyDoc.
diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
index 1f61c6a..c5e1ed3 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -1,3 +1,5 @@
+import groovy.json.JsonOutput
+
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +21,7 @@
*/
plugins { id 'org.apache.beam.module' }
-applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.ml')
description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
@@ -27,16 +29,25 @@ dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+ compile 'com.google.cloud:google-cloud-dlp:1.1.4'
compile 'com.google.cloud:google-cloud-language:1.99.4'
+ provided library.java.junit
+ testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
testCompile library.java.mockito_core
testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+ testCompile 'com.google.cloud:google-cloud-dlp:1.1.4'
+ testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
testCompile 'com.google.cloud:google-cloud-language:1.99.4'
- testCompile library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(":runners:google-cloud-dataflow-java")
}
project.test {
+ def gcpProject = project.findProperty("gcpProject") ?: 'apache-beam-testing'
include "**/**IT.class"
+ def pipelineOptions = [
+ "--project=${gcpProject}"
+ ]
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson(pipelineOptions)
}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
new file mode 100644
index 0000000..aabac4e
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Batches input rows to reduce number of requests sent to Cloud DLP service. */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+ public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+ private final Counter numberOfRowsBagged =
+ Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+
+ private final Integer batchSizeBytes;
+
+ @StateId("elementsBag")
+ private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+ @TimerId("eventTimer")
+ private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ /**
+ * Constructs the batching DoFn.
+ *
+ * @param batchSize Desired batch size in bytes.
+ */
+ public BatchRequestForDLP(Integer batchSize) {
+ this.batchSizeBytes = batchSize;
+ }
+
+ @ProcessElement
+ public void process(
+ @Element KV<String, Table.Row> element,
+ @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+ @TimerId("eventTimer") Timer eventTimer,
+ BoundedWindow w) {
+ elementsBag.add(element);
+ eventTimer.set(w.maxTimestamp());
+ }
+
+ /**
+ * Outputs the elements buffered in the elementsBag in batches of desired size.
+ *
+ * @param elementsBag element buffer.
+ * @param output Batched input elements.
+ */
+ @OnTimer("eventTimer")
+ public void onTimer(
+ @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+ OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+ if (elementsBag.read().iterator().hasNext()) {
+ String key = elementsBag.read().iterator().next().getKey();
+ AtomicInteger bufferSize = new AtomicInteger();
+ List<Table.Row> rows = new ArrayList<>();
+ elementsBag
+ .read()
+ .forEach(
+ element -> {
+ int elementSize = element.getValue().getSerializedSize();
+ boolean clearBuffer = bufferSize.intValue() + elementSize > batchSizeBytes;
+ if (clearBuffer) {
+ LOG.debug(
+ "Clear buffer of {} bytes, Key {}", bufferSize.intValue(), element.getKey());
+ numberOfRowsBagged.inc(rows.size());
+ output.output(KV.of(element.getKey(), rows));
+ rows.clear();
+ bufferSize.set(0);
+ }
+ rows.add(element.getValue());
+ bufferSize.getAndAdd(element.getValue().getSerializedSize());
+ });
+ if (!rows.isEmpty()) {
+ LOG.debug("Outputting remaining {} rows.", rows.size());
+ numberOfRowsBagged.inc(rows.size());
+ output.output(KV.of(key, rows));
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
new file mode 100644
index 0000000..0502950
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * deidentifying text according to provided settings. The transform supports both columnar delimited
+ * input data (eg. CSV) and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with table headers is added to the
+ * PTransform, delimiter also should be set, else the results will be incorrect. If headerColumns is
+ * neither set nor passed as side input, input is assumed to be unstructured.
+ *
+ * <p>Either deidentifyTemplateName (String) or deidentifyConfig {@link DeidentifyConfig} need to be
+ * set. inspectTemplateName and inspectConfig ({@link InspectConfig} are optional.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+ extends PTransform<
+ PCollection<KV<String, String>>, PCollection<KV<String, DeidentifyContentResponse>>> {
+
+ public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+ /** @return Template name for data inspection. */
+ @Nullable
+ public abstract String getInspectTemplateName();
+
+ /** @return Template name for data deidentification. */
+ @Nullable
+ public abstract String getDeidentifyTemplateName();
+
+ /**
+ * @return Configuration object for data inspection. If present, supersedes the template settings.
+ */
+ @Nullable
+ public abstract InspectConfig getInspectConfig();
+
+ /** @return Configuration object for deidentification. If present, supersedes the template. */
+ @Nullable
+ public abstract DeidentifyConfig getDeidentifyConfig();
+
+ /** @return List of column names if the input KV value is a delimited row. */
+ @Nullable
+ public abstract PCollectionView<List<String>> getHeaderColumns();
+
+ /** @return Delimiter to be used when splitting values from input strings into columns. */
+ @Nullable
+ public abstract String getColumnDelimiter();
+
+ /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+ public abstract Integer getBatchSizeBytes();
+
+ /** @return ID of Google Cloud project to be used when deidentifying data. */
+ public abstract String getProjectId();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ /** @param inspectTemplateName Template name for data inspection. */
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ /** @param headerColumns List of column names if the input KV value is a delimited row. */
+ public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+ /**
+ * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+ */
+ public abstract Builder setColumnDelimiter(String delimiter);
+
+ /**
+ * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+ */
+ public abstract Builder setBatchSizeBytes(Integer batchSize);
+
+ /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+ public abstract Builder setProjectId(String projectId);
+
+ /** @param deidentifyTemplateName Template name for data deidentification. */
+ public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+ /**
+ * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+ * template settings.
+ */
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ /**
+ * @param deidentifyConfig Configuration object for data deidentification. If present,
+ * supersedes the template settings.
+ */
+ public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+ abstract DLPDeidentifyText autoBuild();
+
+ public DLPDeidentifyText build() {
+ DLPDeidentifyText dlpDeidentifyText = autoBuild();
+ if (dlpDeidentifyText.getDeidentifyConfig() == null
+ && dlpDeidentifyText.getDeidentifyTemplateName() == null) {
+ throw new IllegalArgumentException(
+ "Either deidentifyConfig or deidentifyTemplateName need to be set!");
+ }
+ if (dlpDeidentifyText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLP_PAYLOAD_LIMIT_BYTES));
+ }
+ if (dlpDeidentifyText.getColumnDelimiter() == null
+ && dlpDeidentifyText.getHeaderColumns() != null) {
+ throw new IllegalArgumentException(
+ "Column delimiter should be set if headers are present.");
+ }
+ if (dlpDeidentifyText.getHeaderColumns() == null
+ && dlpDeidentifyText.getColumnDelimiter() != null) {
+ throw new IllegalArgumentException(
+ "Column headers should be supplied when delimiter is present.");
+ }
+ return dlpDeidentifyText;
+ }
+ }
+
+ public static DLPDeidentifyText.Builder newBuilder() {
+ return new AutoValue_DLPDeidentifyText.Builder();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+ * Cloud DLP service to perform the deidentification according to provided settings.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollection<KV<String, DeidentifyContentResponse>> expand(
+ PCollection<KV<String, String>> input) {
+ return input
+ .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+ .apply(
+ "DLPDeidentify",
+ ParDo.of(
+ new DeidentifyText(
+ getProjectId(),
+ getInspectTemplateName(),
+ getDeidentifyTemplateName(),
+ getInspectConfig(),
+ getDeidentifyConfig(),
+ getHeaderColumns())));
+ }
+
+ /** DoFn performing calls to Cloud DLP service on GCP. */
+ static class DeidentifyText
+ extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, DeidentifyContentResponse>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final String deidentifyTemplateName;
+ private final InspectConfig inspectConfig;
+ private final DeidentifyConfig deidentifyConfig;
+ private final PCollectionView<List<String>> headerColumns;
+ private transient DeidentifyContentRequest.Builder requestBuilder;
+ private transient DlpServiceClient dlpServiceClient;
+
+ @Setup
+ public void setup() throws IOException {
+ requestBuilder =
+ DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ if (deidentifyConfig != null) {
+ requestBuilder.setDeidentifyConfig(deidentifyConfig);
+ }
+ if (deidentifyTemplateName != null) {
+ requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName);
+ }
+ dlpServiceClient = DlpServiceClient.create();
+ }
+
+ @Teardown
+ public void teardown() {
+ dlpServiceClient.close();
+ }
+
+ /**
+ * @param projectId ID of GCP project that should be used for deidentification.
+ * @param inspectTemplateName Template name for inspection. Optional.
+ * @param deidentifyTemplateName Template name for deidentification. Either this or
+ * deidentifyConfig is required.
+ * @param inspectConfig Configuration object for inspection. Optional.
+ * @param deidentifyConfig Deidentification config containing data transformations. Either this
+ * or deidentifyTemplateName is required.
+ * @param headerColumns Header row of the table if applicable.
+ */
+ public DeidentifyText(
+ String projectId,
+ String inspectTemplateName,
+ String deidentifyTemplateName,
+ InspectConfig inspectConfig,
+ DeidentifyConfig deidentifyConfig,
+ PCollectionView<List<String>> headerColumns) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.deidentifyTemplateName = deidentifyTemplateName;
+ this.inspectConfig = inspectConfig;
+ this.deidentifyConfig = deidentifyConfig;
+ this.headerColumns = headerColumns;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ String fileName = c.element().getKey();
+ List<FieldId> dlpTableHeaders;
+ if (headerColumns != null) {
+ dlpTableHeaders =
+ c.sideInput(headerColumns).stream()
+ .map(header -> FieldId.newBuilder().setName(header).build())
+ .collect(Collectors.toList());
+ } else {
+ // handle unstructured input
+ dlpTableHeaders = new ArrayList<>();
+ dlpTableHeaders.add(FieldId.newBuilder().setName("value").build());
+ }
+ Table table =
+ Table.newBuilder()
+ .addAllHeaders(dlpTableHeaders)
+ .addAllRows(c.element().getValue())
+ .build();
+ ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+ this.requestBuilder.setItem(contentItem);
+ DeidentifyContentResponse response =
+ dlpServiceClient.deidentifyContent(this.requestBuilder.build());
+ c.output(KV.of(fileName, response));
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
new file mode 100644
index 0000000..ff58674
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentRequest;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * inspecting text for identifying data according to provided settings. The transform supports both
+ * delimited columnar input data (eg. CSV) and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with table headers is added to the
+ * PTransform, delimiter also should be set, else the results will be incorrect. If headerColumns is
+ * neither set nor passed as sideinput, input is assumed to be unstructured.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * InspectContentResponse}, which will contain a list of {@link
+ * com.google.privacy.dlp.v2.InspectResult} for the user to consume.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPInspectText
+ extends PTransform<
+ PCollection<KV<String, String>>, PCollection<KV<String, InspectContentResponse>>> {
+
+ public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+ /** @return Template name for data inspection. */
+ @Nullable
+ public abstract String getInspectTemplateName();
+
+ /**
+ * @return Configuration object for data inspection. If present, supersedes the template settings.
+ */
+ @Nullable
+ public abstract InspectConfig getInspectConfig();
+
+ /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+ public abstract Integer getBatchSizeBytes();
+
+ /** @return ID of Google Cloud project to be used when deidentifying data. */
+ public abstract String getProjectId();
+
+ /** @return Delimiter to be used when splitting values from input strings into columns. */
+ @Nullable
+ public abstract String getColumnDelimiter();
+
+ /** @return List of column names if the input KV value is a delimited row. */
+ @Nullable
+ public abstract PCollectionView<List<String>> getHeaderColumns();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ /** @param inspectTemplateName Template name for data inspection. */
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ /**
+ * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+ * template settings.
+ */
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ /**
+ * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+ */
+ public abstract Builder setBatchSizeBytes(Integer batchSize);
+
+ /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+ public abstract Builder setProjectId(String projectId);
+
+ /**
+ * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+ */
+ public abstract Builder setColumnDelimiter(String delimiter);
+
+ /** @param headerColumns List of column names if the input KV value is a delimited row. */
+ public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+ abstract DLPInspectText autoBuild();
+
+ public DLPInspectText build() {
+ DLPInspectText inspectText = autoBuild();
+ if (inspectText.getInspectTemplateName() == null && inspectText.getInspectConfig() == null) {
+ throw new IllegalArgumentException(
+ "Either inspectTemplateName or inspectConfig must be supplied!");
+ }
+ if (inspectText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLP_PAYLOAD_LIMIT_BYTES));
+ }
+ if (inspectText.getColumnDelimiter() == null && inspectText.getHeaderColumns() != null) {
+ throw new IllegalArgumentException(
+ "Column delimiter should be set if headers are present.");
+ }
+ if (inspectText.getHeaderColumns() == null && inspectText.getColumnDelimiter() != null) {
+ throw new IllegalArgumentException(
+ "Column headers should be supplied when delimiter is present.");
+ }
+ return inspectText;
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new AutoValue_DLPInspectText.Builder();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+ * Cloud DLP service to perform the data inspection according to provided settings.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollection<KV<String, InspectContentResponse>> expand(
+ PCollection<KV<String, String>> input) {
+ return input
+ .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+ .apply(
+ "DLPInspect",
+ ParDo.of(
+ new InspectData(
+ getProjectId(),
+ getInspectTemplateName(),
+ getInspectConfig(),
+ getHeaderColumns())));
+ }
+
+ /** Performs calls to Cloud DLP service on GCP to inspect input data. */
+ static class InspectData
+ extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, InspectContentResponse>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final InspectConfig inspectConfig;
+ private final PCollectionView<List<String>> headerColumns;
+ private transient DlpServiceClient dlpServiceClient;
+ private transient InspectContentRequest.Builder requestBuilder;
+
+ /**
+ * @param projectId ID of GCP project that should be used for data inspection.
+ * @param inspectTemplateName Template name for inspection.
+ * @param inspectConfig Configuration object for inspection.
+ * @param headerColumns Header row of the table if applicable.
+ */
+ public InspectData(
+ String projectId,
+ String inspectTemplateName,
+ InspectConfig inspectConfig,
+ PCollectionView<List<String>> headerColumns) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.inspectConfig = inspectConfig;
+ this.headerColumns = headerColumns;
+ }
+
+ @Setup
+ public void setup() throws IOException {
+ this.requestBuilder =
+ InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(this.inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ dlpServiceClient = DlpServiceClient.create();
+ }
+
+ @Teardown
+ public void teardown() {
+ dlpServiceClient.close();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ List<FieldId> tableHeaders;
+ if (headerColumns != null) {
+ tableHeaders =
+ c.sideInput(headerColumns).stream()
+ .map(header -> FieldId.newBuilder().setName(header).build())
+ .collect(Collectors.toList());
+ } else {
+ tableHeaders = new ArrayList<>();
+ tableHeaders.add(FieldId.newBuilder().setName("value").build());
+ }
+ Table table =
+ Table.newBuilder().addAllHeaders(tableHeaders).addAllRows(c.element().getValue()).build();
+ ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+ this.requestBuilder.setItem(contentItem);
+ InspectContentResponse response =
+ dlpServiceClient.inspectContent(this.requestBuilder.build());
+ c.output(KV.of(c.element().getKey(), response));
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
new file mode 100644
index 0000000..5dc20cb
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP (https://cloud.google.com/dlp/docs/libraries) and
+ * inspecting text for identifying data according to provided settings.
+ *
+ * <p>The transform supports both delimited columnar input data and unstructured input.
+ *
+ * <p>If the headerColumns property is set and a sideinput with headers is added to the PTransform,
+ * delimiter also should be set, else the results will be incorrect. If headerColumns is neither set
+ * nor passed as sideinput, input is assumed to be unstructured.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform consumes {@link KV} of {@link String}s (assumed to be filename as key and
+ * contents as value) and outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * ReidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>Either reidentifyTemplateName {@link String} or reidentifyConfig {@link DeidentifyConfig} need
+ * to be set. inspectConfig {@link InspectConfig} and inspectTemplateName {@link String} are
+ * optional.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+ extends PTransform<
+ PCollection<KV<String, String>>, PCollection<KV<String, ReidentifyContentResponse>>> {
+
+ public static final Integer DLP_PAYLOAD_LIMIT_BYTES = 524000;
+
+ /** @return Template name for data inspection. */
+ @Nullable
+ public abstract String getInspectTemplateName();
+
+ /** @return Template name for data reidentification. */
+ @Nullable
+ public abstract String getReidentifyTemplateName();
+
+ /**
+ * @return Configuration object for data inspection. If present, supersedes the template settings.
+ */
+ @Nullable
+ public abstract InspectConfig getInspectConfig();
+
+ /** @return Configuration object for reidentification. If present, supersedes the template. */
+ @Nullable
+ public abstract DeidentifyConfig getReidentifyConfig();
+
+ /** @return Delimiter to be used when splitting values from input strings into columns. */
+ @Nullable
+ public abstract String getColumnDelimiter();
+
+ /** @return List of column names if the input KV value is a delimited row. */
+ @Nullable
+ public abstract PCollectionView<List<String>> getHeaderColumns();
+
+ /** @return Size of input elements batch to be sent to Cloud DLP service in one request. */
+ public abstract Integer getBatchSizeBytes();
+
+ /** @return ID of Google Cloud project to be used when deidentifying data. */
+ public abstract String getProjectId();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ /** @param inspectTemplateName Template name for data inspection. */
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ /**
+ * @param inspectConfig Configuration object for data inspection. If present, supersedes the
+ * template settings.
+ */
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ /**
+ * @param reidentifyConfig Configuration object for data deidentification. If present,
+ * supersedes the template settings.
+ */
+ public abstract Builder setReidentifyConfig(DeidentifyConfig reidentifyConfig);
+
+ /** @param reidentifyTemplateName Template name for data deidentification. */
+ public abstract Builder setReidentifyTemplateName(String reidentifyTemplateName);
+
+ /**
+ * @param batchSize Size of input elements batch to be sent to Cloud DLP service in one request.
+ */
+ public abstract Builder setBatchSizeBytes(Integer batchSize);
+ /** @param headerColumns List of column names if the input KV value is a delimited row. */
+ public abstract Builder setHeaderColumns(PCollectionView<List<String>> headerColumns);
+
+ /**
+ * @param delimiter Delimiter to be used when splitting values from input strings into columns.
+ */
+ public abstract Builder setColumnDelimiter(String delimiter);
+
+ /** @param projectId ID of Google Cloud project to be used when deidentifying data. */
+ public abstract Builder setProjectId(String projectId);
+
+ abstract DLPReidentifyText autoBuild();
+
+ public DLPReidentifyText build() {
+ DLPReidentifyText dlpReidentifyText = autoBuild();
+ if (dlpReidentifyText.getReidentifyConfig() == null
+ && dlpReidentifyText.getReidentifyTemplateName() == null) {
+ throw new IllegalArgumentException(
+ "Either reidentifyConfig or reidentifyTemplateName need to be set!");
+ }
+ if (dlpReidentifyText.getBatchSizeBytes() > DLP_PAYLOAD_LIMIT_BYTES) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLP_PAYLOAD_LIMIT_BYTES));
+ }
+ if (dlpReidentifyText.getColumnDelimiter() == null
+ && dlpReidentifyText.getHeaderColumns() != null) {
+ throw new IllegalArgumentException(
+ "Column delimiter should be set if headers are present.");
+ }
+ if (dlpReidentifyText.getHeaderColumns() == null
+ && dlpReidentifyText.getColumnDelimiter() != null) {
+ throw new IllegalArgumentException(
+ "Column headers should be supplied when delimiter is present.");
+ }
+ return dlpReidentifyText;
+ }
+ }
+
+ public static DLPReidentifyText.Builder newBuilder() {
+ return new AutoValue_DLPReidentifyText.Builder();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link Table.Row}s and then calls
+ * Cloud DLP service to perform the reidentification according to provided settings.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollection<KV<String, ReidentifyContentResponse>> expand(
+ PCollection<KV<String, String>> input) {
+ return input
+ .apply(ParDo.of(new MapStringToDlpRow(getColumnDelimiter())))
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(getBatchSizeBytes())))
+ .apply(
+ "DLPReidentify",
+ ParDo.of(
+ new ReidentifyText(
+ getProjectId(),
+ getInspectTemplateName(),
+ getReidentifyTemplateName(),
+ getInspectConfig(),
+ getReidentifyConfig(),
+ getHeaderColumns())));
+ }
+
+ /** Performs the calls to Cloud DLP service on GCP. */
+ static class ReidentifyText
+ extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, ReidentifyContentResponse>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final String reidentifyTemplateName;
+ private final InspectConfig inspectConfig;
+ private final DeidentifyConfig reidentifyConfig;
+ private final PCollectionView<List<String>> headerColumns;
+ private transient ReidentifyContentRequest.Builder requestBuilder;
+ private transient DlpServiceClient dlpServiceClient;
+
+ @Setup
+ public void setup() throws IOException {
+ requestBuilder =
+ ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ if (reidentifyConfig != null) {
+ requestBuilder.setReidentifyConfig(reidentifyConfig);
+ }
+ if (reidentifyTemplateName != null) {
+ requestBuilder.setReidentifyTemplateName(reidentifyTemplateName);
+ }
+ dlpServiceClient = DlpServiceClient.create();
+ }
+
+ @Teardown
+ public void teardown() {
+ dlpServiceClient.close();
+ }
+
+ /**
+ * @param projectId ID of GCP project that should be used for deidentification.
+ * @param inspectTemplateName Template name for inspection. Optional.
+ * @param reidentifyTemplateName Template name for reidentification. Either this or
+ * reidentifyConfig is required.
+ * @param inspectConfig Configuration object for inspection. Optional.
+ * @param reidentifyConfig Reidentification config containing data transformations. Either this
+ * or reidentifyTemplateName is required.
+ * @param headerColumns Header row of the table if applicable.
+ */
+ public ReidentifyText(
+ String projectId,
+ String inspectTemplateName,
+ String reidentifyTemplateName,
+ InspectConfig inspectConfig,
+ DeidentifyConfig reidentifyConfig,
+ PCollectionView<List<String>> headerColumns) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.reidentifyTemplateName = reidentifyTemplateName;
+ this.inspectConfig = inspectConfig;
+ this.reidentifyConfig = reidentifyConfig;
+ this.headerColumns = headerColumns;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws IOException {
+ List<FieldId> tableHeaders;
+ if (headerColumns != null) {
+ tableHeaders =
+ context.sideInput(headerColumns).stream()
+ .map(header -> FieldId.newBuilder().setName(header).build())
+ .collect(Collectors.toList());
+ } else {
+ // handle unstructured input.
+ tableHeaders = new ArrayList<>();
+ tableHeaders.add(FieldId.newBuilder().setName("value").build());
+ }
+ Table table =
+ Table.newBuilder()
+ .addAllHeaders(tableHeaders)
+ .addAllRows(context.element().getValue())
+ .build();
+ ContentItem contentItem = ContentItem.newBuilder().setTable(table).build();
+ this.requestBuilder.setItem(contentItem);
+ ReidentifyContentResponse response =
+ dlpServiceClient.reidentifyContent(requestBuilder.build());
+ context.output(KV.of(context.element().getKey(), response));
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java
new file mode 100644
index 0000000..944656c
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRow.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import com.google.privacy.dlp.v2.Value;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Maps {@link KV}s of {@link String}s into KV<{@link String}, {@link Table.Row}> for further
+ * processing in the DLP transforms.
+ *
+ * <p>If a delimiter of values isn't provided, input is assumed to be unstructured and the input KV
+ * value is saved in a single column of output {@link Table.Row}.
+ */
+class MapStringToDlpRow extends DoFn<KV<String, String>, KV<String, Table.Row>> {
+ private final String delimiter;
+
+ /**
+ * @param delimiter Delimiter of values in the delimited value row that may be in the value of
+ * input KV.
+ */
+ public MapStringToDlpRow(String delimiter) {
+ this.delimiter = delimiter;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ Table.Row.Builder rowBuilder = Table.Row.newBuilder();
+ String line = Objects.requireNonNull(context.element().getValue());
+ if (delimiter != null) {
+ List<String> values = Arrays.asList(line.split(delimiter));
+ values.forEach(
+ value -> rowBuilder.addValues(Value.newBuilder().setStringValue(value).build()));
+ } else {
+ rowBuilder.addValues(Value.newBuilder().setStringValue(line).build());
+ }
+ context.output(KV.of(context.element().getKey(), rowBuilder.build()));
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java
new file mode 100644
index 0000000..3e9a741
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDlpTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class BatchRequestForDlpTest {
+
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ @Test
+ public void batchesRequests() {
+ PCollection<KV<String, Iterable<Table.Row>>> batchedRows =
+ testPipeline
+ .apply(Create.of(KV.of("key", "value1"), KV.of("key", "value2")))
+ .apply(ParDo.of(new MapStringToDlpRow(null)))
+ .apply(ParDo.of(new BatchRequestForDLP(524000)));
+ PAssert.that(batchedRows).satisfies(new VerifyPCollectionSize());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private static class VerifyPCollectionSize
+ implements SerializableFunction<Iterable<KV<String, Iterable<Table.Row>>>, Void> {
+ @Override
+ public Void apply(Iterable<KV<String, Iterable<Table.Row>>> input) {
+ List<KV<String, Iterable<Table.Row>>> itemList = new ArrayList<>();
+ input.forEach(itemList::add);
+ assertEquals(1, itemList.size());
+ return null;
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java
new file mode 100644
index 0000000..c2e5741
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPDeidentifyTextTest {
+
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static final Integer BATCH_SIZE_SMALL = 200;
+ private static final String DELIMITER = ";";
+ private static final String TEMPLATE_NAME = "test_template";
+ private static final String PROJECT_ID = "test_id";
+
+ @Test
+ public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+ assertThrows(
+ "Either deidentifyConfig or deidentifyTemplateName need to be set!",
+ IllegalArgumentException.class,
+ () ->
+ DLPDeidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+ PCollectionView<List<String>> header =
+ testPipeline.apply(Create.of("header")).apply(View.asList());
+ assertThrows(
+ "Column delimiter should be set if headers are present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPDeidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setDeidentifyTemplateName(TEMPLATE_NAME)
+ .setHeaderColumns(header)
+ .build());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void throwsExceptionWhenBatchSizeIsTooLarge() {
+ assertThrows(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLPDeidentifyText.DLP_PAYLOAD_LIMIT_BYTES),
+ IllegalArgumentException.class,
+ () ->
+ DLPDeidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(Integer.MAX_VALUE)
+ .setDeidentifyTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+ assertThrows(
+ "Column headers should be supplied when delimiter is present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPDeidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setDeidentifyTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java
new file mode 100644
index 0000000..46c8fa3
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPInspectTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPInspectTextTest {
+
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static final Integer BATCH_SIZE_SMALL = 200;
+ private static final String DELIMITER = ";";
+ private static final String TEMPLATE_NAME = "test_template";
+ private static final String PROJECT_ID = "test_id";
+
+ @Test
+ public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+ assertThrows(
+ "Either inspectTemplateName or inspectConfig must be supplied!",
+ IllegalArgumentException.class,
+ () ->
+ DLPInspectText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+ PCollectionView<List<String>> header =
+ testPipeline.apply(Create.of("header")).apply(View.asList());
+ assertThrows(
+ "Column delimiter should be set if headers are present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPInspectText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setInspectTemplateName(TEMPLATE_NAME)
+ .setHeaderColumns(header)
+ .build());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void throwsExceptionWhenBatchSizeIsTooLarge() {
+ assertThrows(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLPInspectText.DLP_PAYLOAD_LIMIT_BYTES),
+ IllegalArgumentException.class,
+ () ->
+ DLPInspectText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(Integer.MAX_VALUE)
+ .setInspectTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+ assertThrows(
+ "Column headers should be supplied when delimiter is present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPInspectText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setInspectTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java
new file mode 100644
index 0000000..9fc0f0a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyTextTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertThrows;
+
+import java.util.List;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPReidentifyTextTest {
+
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static final Integer BATCH_SIZE_SMALL = 200;
+ private static final String DELIMITER = ";";
+ private static final String TEMPLATE_NAME = "test_template";
+ private static final String PROJECT_ID = "test_id";
+
+ @Test
+ public void throwsExceptionWhenDeidentifyConfigAndTemplatesAreEmpty() {
+ assertThrows(
+ "Either reidentifyConfig or reidentifyTemplateName need to be set!",
+ IllegalArgumentException.class,
+ () ->
+ DLPReidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsNullAndHeadersAreSet() {
+ PCollectionView<List<String>> header =
+ testPipeline.apply(Create.of("header")).apply(View.asList());
+ assertThrows(
+ "Column delimiter should be set if headers are present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPReidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setReidentifyTemplateName(TEMPLATE_NAME)
+ .setHeaderColumns(header)
+ .build());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void throwsExceptionWhenBatchSizeIsTooLarge() {
+ assertThrows(
+ String.format(
+ "Batch size is too large! It should be smaller or equal than %d.",
+ DLPDeidentifyText.DLP_PAYLOAD_LIMIT_BYTES),
+ IllegalArgumentException.class,
+ () ->
+ DLPReidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(Integer.MAX_VALUE)
+ .setReidentifyTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+
+ @Test
+ public void throwsExceptionWhenDelimiterIsSetAndHeadersAreNot() {
+ assertThrows(
+ "Column headers should be supplied when delimiter is present.",
+ IllegalArgumentException.class,
+ () ->
+ DLPReidentifyText.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setBatchSizeBytes(BATCH_SIZE_SMALL)
+ .setReidentifyTemplateName(TEMPLATE_NAME)
+ .setColumnDelimiter(DELIMITER)
+ .build());
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
new file mode 100644
index 0000000..0ebbfb7
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+ private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+ private static final InspectConfig inspectConfig =
+ InspectConfig.newBuilder()
+ .addInfoTypes(emailAddress)
+ .setMinLikelihood(Likelihood.LIKELY)
+ .build();
+
+ @Test
+ public void inspectsText() {
+ String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+ PCollection<KV<String, InspectContentResponse>> inspectionResult =
+ testPipeline
+ .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+ .apply(
+ DLPInspectText.newBuilder()
+ .setBatchSizeBytes(524000)
+ .setProjectId(projectId)
+ .setInspectConfig(inspectConfig)
+ .build());
+ PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void deidentifiesText() {
+ String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+ PCollection<KV<String, DeidentifyContentResponse>> deidentificationResult =
+ testPipeline
+ .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+ .apply(
+ DLPDeidentifyText.newBuilder()
+ .setBatchSizeBytes(524000)
+ .setProjectId(projectId)
+ .setDeidentifyConfig(getDeidentifyConfig())
+ .build());
+ PAssert.that(deidentificationResult)
+ .satisfies(new VerifyDeidentificationResult("####################"));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private DeidentifyConfig getDeidentifyConfig() {
+ CharacterMaskConfig characterMaskConfig =
+ CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build();
+ PrimitiveTransformation primitiveTransformation =
+ PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build();
+ InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation =
+ InfoTypeTransformations.InfoTypeTransformation.newBuilder()
+ .addInfoTypes(emailAddress)
+ .setPrimitiveTransformation(primitiveTransformation)
+ .build();
+ return DeidentifyConfig.newBuilder()
+ .setInfoTypeTransformations(
+ InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build())
+ .build();
+ }
+
+ private static class VerifyInspectionResult
+ implements SerializableFunction<Iterable<KV<String, InspectContentResponse>>, Void> {
+ @Override
+ public Void apply(Iterable<KV<String, InspectContentResponse>> input) {
+ List<Boolean> matches = new ArrayList<>();
+ input.forEach(
+ item -> {
+ List<Finding> resultList = item.getValue().getResult().getFindingsList();
+ matches.add(
+ resultList.stream()
+ .anyMatch(finding -> finding.getInfoType().equals(emailAddress)));
+ });
+ assertTrue(matches.contains(Boolean.TRUE));
+ return null;
+ }
+ }
+
+ private static class VerifyDeidentificationResult
+ implements SerializableFunction<Iterable<KV<String, DeidentifyContentResponse>>, Void> {
+ private final String expectedValue;
+
+ public VerifyDeidentificationResult(String expectedValue) {
+ this.expectedValue = expectedValue;
+ }
+
+ @Override
+ public Void apply(Iterable<KV<String, DeidentifyContentResponse>> input) {
+ List<Boolean> matches = new ArrayList<>();
+ input.forEach(
+ item -> {
+ item.getValue()
+ .getItem()
+ .getTable()
+ .getRowsList()
+ .forEach(
+ row ->
+ matches.add(
+ row.getValuesList().stream()
+ .anyMatch(value -> value.getStringValue().equals(expectedValue))));
+ assertTrue(
+ item.getValue()
+ .getItem()
+ .getTable()
+ .getHeadersList()
+ .contains(FieldId.newBuilder().setName("value").build()));
+ });
+ assertTrue(matches.contains(Boolean.TRUE));
+ return null;
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java
new file mode 100644
index 0000000..577a5dc
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/MapStringToDlpRowTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import com.google.privacy.dlp.v2.Value;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class MapStringToDlpRowTest {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ @Test
+ public void mapsStringToRow() {
+ PCollection<KV<String, Table.Row>> rowCollection =
+ testPipeline
+ .apply(Create.of(KV.of("key", "value")))
+ .apply(ParDo.of(new MapStringToDlpRow(null)));
+ PAssert.that(rowCollection)
+ .containsInAnyOrder(
+ KV.of(
+ "key",
+ Table.Row.newBuilder()
+ .addValues(Value.newBuilder().setStringValue("value").build())
+ .build()));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void mapsDelimitedStringToRow() {
+ PCollection<KV<String, Table.Row>> rowCollection =
+ testPipeline
+ .apply(Create.of(KV.of("key", "value,secondValue")))
+ .apply(ParDo.of(new MapStringToDlpRow(",")));
+ PAssert.that(rowCollection)
+ .containsInAnyOrder(
+ KV.of(
+ "key",
+ Table.Row.newBuilder()
+ .addValues(Value.newBuilder().setStringValue("value").build())
+ .addValues(Value.newBuilder().setStringValue("secondValue").build())
+ .build()));
+ testPipeline.run().waitUntilFinish();
+ }
+}