You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mw...@apache.org on 2020/04/29 09:40:18 UTC
[beam] 01/01: [BEAM-9723] Add DLP integration transforms
This is an automated email from the ASF dual-hosted git repository.
mwalenia pushed a commit to branch BEAM-9723-java-dlp
in repository https://gitbox.apache.org/repos/asf/beam.git
commit a6c0dc3024f1defc1f659b2371f516925bc9d7f2
Author: Michal Walenia <mi...@polidea.com>
AuthorDate: Mon Apr 27 10:04:12 2020 +0200
[BEAM-9723] Add DLP integration transforms
---
sdks/java/extensions/ml/build.gradle | 5 +-
.../beam/sdk/extensions/ml/BatchRequestForDLP.java | 119 +++++++++++++
.../beam/sdk/extensions/ml/DLPDeidentifyText.java | 186 +++++++++++++++++++++
.../beam/sdk/extensions/ml/DLPInspectText.java | 147 ++++++++++++++++
.../beam/sdk/extensions/ml/DLPReidentifyText.java | 179 ++++++++++++++++++++
.../sdk/extensions/ml/DLPTextOperationsIT.java | 121 ++++++++++++++
6 files changed, 756 insertions(+), 1 deletion(-)
diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle
index 274c074..6f9b567 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -26,10 +26,13 @@ description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
- testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+ compile 'com.google.cloud:google-cloud-dlp:1.1.1'
+ testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
testCompile library.java.mockito_core
testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+ testCompile 'com.google.cloud:google-cloud-dlp:1.1.1'
+ testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime")
testCompile library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testRuntimeOnly project(":runners:google-cloud-dataflow-java")
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
new file mode 100644
index 0000000..9a2d4d2
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Instant;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+class BatchRequestForDLP extends DoFn<KV<String, String>, KV<String, String>> {
+ private final Integer batchSize;
+ public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+ public BatchRequestForDLP(Integer batchSize) {
+ if (batchSize > DLP_PAYLOAD_LIMIT) {
+ throw new IllegalArgumentException(
+ "DLP batch size exceeds payload limit.\n"
+ + "Batch size should be smaller than "
+ + DLP_PAYLOAD_LIMIT);
+ }
+ this.batchSize = batchSize;
+ }
+
+ @StateId("elementsBag")
+ private final StateSpec<BagState<KV<String, String>>> elementsBag = StateSpecs.bag();
+
+ @StateId("elementsSize")
+ private final StateSpec<ValueState<Integer>> elementsSize = StateSpecs.value();
+
+ @TimerId("eventTimer")
+ private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(
+ @Element KV<String, String> element,
+ @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+ @StateId("elementsSize") ValueState<Integer> elementsSize,
+ @Timestamp Instant elementTs,
+ @TimerId("eventTimer") Timer eventTimer,
+ OutputReceiver<KV<String, String>> output) {
+ eventTimer.set(elementTs);
+ Integer currentElementSize =
+ (element.getValue() == null) ? 0 : element.getValue().getBytes(UTF_8).length;
+ Integer currentBufferSize = (elementsSize.read() == null) ? 0 : elementsSize.read();
+ boolean clearBuffer = (currentElementSize + currentBufferSize) > batchSize;
+ if (clearBuffer) {
+ KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+ output.output(inspectBufferedData);
+ DLPInspectText.LOG.info(
+ "****CLEAR BUFFER Key {} **** Current Content Size {}",
+ inspectBufferedData.getKey(),
+ inspectBufferedData.getValue().getBytes(UTF_8).length);
+ clearState(elementsBag, elementsSize);
+ } else {
+ elementsBag.add(element);
+ elementsSize.write(currentElementSize + currentBufferSize);
+ }
+ }
+
+ @OnTimer("eventTimer")
+ public void onTimer(
+ @StateId("elementsBag") BagState<KV<String, String>> elementsBag,
+ @StateId("elementsSize") ValueState<Integer> elementsSize,
+ OutputReceiver<KV<String, String>> output) {
+ // Process left over records less than batch size
+ KV<String, String> inspectBufferedData = emitResult(elementsBag.read());
+ output.output(inspectBufferedData);
+ DLPInspectText.LOG.info(
+ "****Timer Triggered Key {} **** Current Content Size {}",
+ inspectBufferedData.getKey(),
+ inspectBufferedData.getValue().getBytes(UTF_8).length);
+ clearState(elementsBag, elementsSize);
+ }
+
+ private static KV<String, String> emitResult(Iterable<KV<String, String>> bufferData) {
+ StringBuilder builder = new StringBuilder();
+ String fileName =
+ (bufferData.iterator().hasNext()) ? bufferData.iterator().next().getKey() : "UNKNOWN_FILE";
+ bufferData.forEach(
+ e -> {
+ builder.append(e.getValue());
+ });
+ return KV.of(fileName, builder.toString());
+ }
+
+ private static void clearState(
+ BagState<KV<String, String>> elementsBag, ValueState<Integer> elementsSize) {
+ elementsBag.clear();
+ elementsSize.clear();
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
new file mode 100644
index 0000000..b7e037a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+ extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+ public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+ @Nullable
+ public abstract String inspectTemplateName();
+
+ @Nullable
+ public abstract String deidentifyTemplateName();
+
+ @Nullable
+ public abstract InspectConfig inspectConfig();
+
+ @Nullable
+ public abstract DeidentifyConfig deidentifyConfig();
+
+ public abstract Integer batchSize();
+
+ public abstract String projectId();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ public abstract Builder setBatchSize(Integer batchSize);
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+ public abstract DLPDeidentifyText build();
+ }
+
+ public static DLPDeidentifyText.Builder newBuilder() {
+ return new AutoValue_DLPDeidentifyText.Builder();
+ }
+
+ /**
+ * The transform batches the contents of input PCollection and then calls Cloud DLP service to
+ * perform the deidentification.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+ return input
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+ .apply(
+ "DLPDeidentify",
+ ParDo.of(
+ new DeidentifyText(
+ projectId(),
+ inspectTemplateName(),
+ deidentifyTemplateName(),
+ inspectConfig(),
+ deidentifyConfig())));
+ }
+
+ static class DeidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final String deidentifyTemplateName;
+ private final InspectConfig inspectConfig;
+ private final DeidentifyConfig deidentifyConfig;
+ private transient DeidentifyContentRequest.Builder requestBuilder;
+
+ @Setup
+ public void setup() throws IOException {
+ requestBuilder =
+ DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ if (inspectConfig == null && inspectTemplateName == null) {
+ throw new IllegalArgumentException(
+ "Either inspectConfig or inspectTemplateName need to be set!");
+ }
+ if (deidentifyConfig != null) {
+ requestBuilder.setDeidentifyConfig(deidentifyConfig);
+ }
+ if (deidentifyTemplateName != null) {
+ requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName);
+ }
+ if (deidentifyConfig == null && deidentifyTemplateName == null) {
+ throw new IllegalArgumentException(
+ "Either deidentifyConfig or deidentifyTemplateName need to be set!");
+ }
+ }
+
+ public DeidentifyText(
+ String projectId,
+ String inspectTemplateName,
+ String deidentifyTemplateName,
+ InspectConfig inspectConfig,
+ DeidentifyConfig deidentifyConfig) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.deidentifyTemplateName = deidentifyTemplateName;
+ this.inspectConfig = inspectConfig;
+ this.deidentifyConfig = deidentifyConfig;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+ if (!c.element().getValue().isEmpty()) {
+ ContentItem contentItem =
+ ContentItem.newBuilder().setValue(c.element().getValue()).build();
+ this.requestBuilder.setItem(contentItem);
+ if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+ String errorMessage =
+ String.format(
+ "Payload Size %s Exceeded Batch Size %s",
+ this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+ LOG.error(errorMessage);
+ } else {
+ DeidentifyContentResponse response =
+ dlpServiceClient.deidentifyContent(this.requestBuilder.build());
+ response.getItem().getValue();
+ c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
new file mode 100644
index 0000000..e63bfe3
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentRequest;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.ProjectName;
+import java.io.IOException;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPInspectText
+ extends PTransform<PCollection<KV<String, String>>, PCollection<List<Finding>>> {
+ public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+ public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+ @Nullable
+ public abstract String inspectTemplateName();
+
+ @Nullable
+ public abstract InspectConfig inspectConfig();
+
+ public abstract Integer batchSize();
+
+ public abstract String projectId();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ public abstract Builder setBatchSize(Integer batchSize);
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract DLPInspectText build();
+ }
+
+ public static Builder newBuilder() {
+ return new AutoValue_DLPInspectText.Builder();
+ }
+
+ @Override
+ public PCollection<List<Finding>> expand(PCollection<KV<String, String>> input) {
+ return input
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+ .apply(
+ "DLPInspect",
+ ParDo.of(new InspectData(projectId(), inspectTemplateName(), inspectConfig())));
+ }
+
+ public static class InspectData extends DoFn<KV<String, String>, List<Finding>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final InspectConfig inspectConfig;
+ private transient InspectContentRequest.Builder requestBuilder;
+ private final Counter numberOfBytesInspected =
+ Metrics.counter(InspectData.class, "NumberOfBytesInspected");
+
+ public InspectData(String projectId, String inspectTemplateName, InspectConfig inspectConfig) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.inspectConfig = inspectConfig;
+ }
+
+ @Setup
+ public void setup() {
+ this.requestBuilder =
+ InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(this.inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ if (inspectTemplateName == null && inspectConfig == null) {
+ throw new IllegalArgumentException("");
+ }
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+ if (!c.element().getValue().isEmpty()) {
+ ContentItem contentItem =
+ ContentItem.newBuilder().setValue(c.element().getValue()).build();
+ this.requestBuilder.setItem(contentItem);
+ if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+ String errorMessage =
+ String.format(
+ "Payload Size %s Exceeded Batch Size %s",
+ this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+ LOG.error(errorMessage);
+ } else {
+ InspectContentResponse response =
+ dlpServiceClient.inspectContent(this.requestBuilder.build());
+ List<Finding> findingsList = response.getResult().getFindingsList();
+ c.output(findingsList);
+ numberOfBytesInspected.inc(contentItem.getSerializedSize());
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
new file mode 100644
index 0000000..e841d9a
--- /dev/null
+++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+ extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+ public static final Integer DLP_PAYLOAD_LIMIT = 52400;
+
+ @Nullable
+ public abstract String inspectTemplateName();
+
+ @Nullable
+ public abstract String reidentifyTemplateName();
+
+ @Nullable
+ public abstract InspectConfig inspectConfig();
+
+ @Nullable
+ public abstract DeidentifyConfig reidentifyConfig();
+
+ public abstract Integer batchSize();
+
+ public abstract String projectId();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+ public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+ public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+ public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName);
+
+ public abstract Builder setBatchSize(Integer batchSize);
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract DLPReidentifyText build();
+ }
+
+ public static DLPReidentifyText.Builder newBuilder() {
+ return new AutoValue_DLPReidentifyText.Builder();
+ }
+
+ @Override
+ public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
+ return input
+ .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+ .apply(
+ "DLPDeidentify",
+ ParDo.of(
+ new ReidentifyText(
+ projectId(),
+ inspectTemplateName(),
+ reidentifyTemplateName(),
+ inspectConfig(),
+ reidentifyConfig())));
+ }
+
+ public static class ReidentifyText extends DoFn<KV<String, String>, KV<String, String>> {
+ private final String projectId;
+ private final String inspectTemplateName;
+ private final String reidentifyTemplateName;
+ private final InspectConfig inspectConfig;
+ private final DeidentifyConfig reidentifyConfig;
+ private transient ReidentifyContentRequest.Builder requestBuilder;
+
+ @Setup
+ public void setup() throws IOException {
+ requestBuilder =
+ ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+ if (inspectTemplateName != null) {
+ requestBuilder.setInspectTemplateName(inspectTemplateName);
+ }
+ if (inspectConfig != null) {
+ requestBuilder.setInspectConfig(inspectConfig);
+ }
+ if (inspectConfig == null && inspectTemplateName == null) {
+ throw new IllegalArgumentException(
+ "Either inspectConfig or inspectTemplateName need to be set!");
+ }
+ if (reidentifyConfig != null) {
+ requestBuilder.setReidentifyConfig(reidentifyConfig);
+ }
+ if (reidentifyTemplateName != null) {
+ requestBuilder.setReidentifyTemplateName(reidentifyTemplateName);
+ }
+ if (reidentifyConfig == null && reidentifyTemplateName == null) {
+ throw new IllegalArgumentException(
+ "Either reidentifyConfig or reidentifyTemplateName need to be set!");
+ }
+ }
+
+ public ReidentifyText(
+ String projectId,
+ String inspectTemplateName,
+ String reidentifyTemplateName,
+ InspectConfig inspectConfig,
+ DeidentifyConfig reidentifyConfig) {
+ this.projectId = projectId;
+ this.inspectTemplateName = inspectTemplateName;
+ this.reidentifyTemplateName = reidentifyTemplateName;
+ this.inspectConfig = inspectConfig;
+ this.reidentifyConfig = reidentifyConfig;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws IOException {
+ try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) {
+ if (!c.element().getValue().isEmpty()) {
+ ContentItem contentItem =
+ ContentItem.newBuilder().setValue(c.element().getValue()).build();
+ this.requestBuilder.setItem(contentItem);
+ if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) {
+ String errorMessage =
+ String.format(
+ "Payload Size %s Exceeded Batch Size %s",
+ this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT);
+ LOG.error(errorMessage);
+ } else {
+ ReidentifyContentResponse response =
+ dlpServiceClient.reidentifyContent(this.requestBuilder.build());
+ response.getItem().getValue();
+ c.output(KV.of(c.element().getKey(), response.getItem().getValue()));
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
new file mode 100644
index 0000000..5401105
--- /dev/null
+++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+ private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();;
+ private static InspectConfig inspectConfig =
+ InspectConfig.newBuilder()
+ .addInfoTypes(emailAddress)
+ .setMinLikelihood(Likelihood.LIKELY)
+ .build();
+
+ @Test
+ public void inspectsText() {
+ String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+ PCollection<List<Finding>> inspectionResult =
+ testPipeline
+ .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+ .apply(
+ DLPInspectText.newBuilder()
+ .setBatchSize(52400)
+ .setProjectId(projectId)
+ .setInspectConfig(inspectConfig)
+ .build());
+ PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Test
+ public void deidentifiesText() {
+ emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+ String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+ PCollection<KV<String, String>> deidentificationResult =
+ testPipeline
+ .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+ .apply(
+ DLPDeidentifyText.newBuilder()
+ .setBatchSize(52400)
+ .setProjectId(projectId)
+ .setInspectConfig(inspectConfig)
+ .setDeidentifyConfig(getDeidentifyConfig())
+ .build());
+ PAssert.that(deidentificationResult).containsInAnyOrder(KV.of("", "####################"));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private DeidentifyConfig getDeidentifyConfig() {
+ CharacterMaskConfig characterMaskConfig =
+ CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build();
+ PrimitiveTransformation primitiveTransformation =
+ PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build();
+ InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation =
+ InfoTypeTransformations.InfoTypeTransformation.newBuilder()
+ .addInfoTypes(emailAddress)
+ .setPrimitiveTransformation(primitiveTransformation)
+ .build();
+ return DeidentifyConfig.newBuilder()
+ .setInfoTypeTransformations(
+ InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build())
+ .build();
+ }
+
+ private static class VerifyInspectionResult
+ implements SerializableFunction<Iterable<List<Finding>>, Void> {
+ @Override
+ public Void apply(Iterable<List<Finding>> input) {
+ List<Boolean> matches = new ArrayList<>();
+ input.forEach(
+ resultList ->
+ matches.add(
+ resultList.stream()
+ .anyMatch(finding -> finding.getInfoType().equals(emailAddress))));
+ assertEquals(Boolean.TRUE, matches.contains(Boolean.TRUE));
+ return null;
+ }
+ }
+}