You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/05/18 23:03:39 UTC

[GitHub] [beam] tysonjh commented on a change in pull request #11566: [BEAM-9723] Add DLP integration transforms

tysonjh commented on a change in pull request #11566:
URL: https://github.com/apache/beam/pull/11566#discussion_r426831619



##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();

Review comment:
       Is there a guarantee that at least one element will be in the elementsBag iterator or is there a chance for a NoSuchElementsException on the `next()` call?

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings. The transform supports both CSV formatted input data and unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.

Review comment:
       Would you move the comments about how a method works, or what it does, to the method definition please?

##########
File path: sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.privacy.dlp.v2.CharacterMaskConfig;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.Finding;
+import com.google.privacy.dlp.v2.InfoType;
+import com.google.privacy.dlp.v2.InfoTypeTransformations;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.InspectContentResponse;
+import com.google.privacy.dlp.v2.Likelihood;
+import com.google.privacy.dlp.v2.PrimitiveTransformation;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DLPTextOperationsIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static final String IDENTIFYING_TEXT = "mary.sue@example.com";
+  private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();
+  private static final InspectConfig inspectConfig =
+      InspectConfig.newBuilder()
+          .addInfoTypes(emailAddress)
+          .setMinLikelihood(Likelihood.LIKELY)
+          .build();
+
+  @Test
+  public void inspectsText() {
+    String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject();
+    PCollection<KV<String, InspectContentResponse>> inspectionResult =
+        testPipeline
+            .apply(Create.of(KV.of("", IDENTIFYING_TEXT)))
+            .apply(
+                DLPInspectText.newBuilder()
+                    .setBatchSize(52400)

Review comment:
       Can you use DLP_PAYLOAD_LIMIT instead?

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {

Review comment:
       Please add a comment. It would also be beneficial to name this variable with a unit like `batchSizeBytes`.

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings. The transform supports both CSV formatted input data and unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView<List<String>> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();
+
+  public abstract Integer batchSize();
+
+  public abstract String projectId();
+
+  @AutoValue.Builder
+  public abstract static class Builder {
+    public abstract Builder setInspectTemplateName(String inspectTemplateName);
+
+    public abstract Builder setCsvHeader(PCollectionView<List<String>> csvHeader);
+
+    public abstract Builder setCsvDelimiter(String delimiter);
+
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    public abstract Builder setProjectId(String projectId);
+
+    public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName);
+
+    public abstract Builder setInspectConfig(InspectConfig inspectConfig);
+
+    public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig);
+
+    public abstract DLPDeidentifyText build();
+  }
+
+  public static DLPDeidentifyText.Builder newBuilder() {
+    return new AutoValue_DLPDeidentifyText.Builder();
+  }
+
+  /**
+   * The transform batches the contents of input PCollection and then calls Cloud DLP service to
+   * perform the deidentification.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollection<KV<String, DeidentifyContentResponse>> expand(
+      PCollection<KV<String, String>> input) {
+    return input
+        .apply(ParDo.of(new MapStringToDlpRow(csvDelimiter())))
+        .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize())))
+        .apply(
+            "DLPDeidentify",
+            ParDo.of(
+                new DeidentifyText(
+                    projectId(),
+                    inspectTemplateName(),
+                    deidentifyTemplateName(),
+                    inspectConfig(),
+                    deidentifyConfig(),
+                    csvHeader())));
+  }
+
+  static class DeidentifyText
+      extends DoFn<KV<String, Iterable<Table.Row>>, KV<String, DeidentifyContentResponse>> {
+    private final String projectId;
+    private final String inspectTemplateName;
+    private final String deidentifyTemplateName;
+    private final InspectConfig inspectConfig;
+    private final DeidentifyConfig deidentifyConfig;
+    private final PCollectionView<List<String>> csvHeaders;
+    private transient DeidentifyContentRequest.Builder requestBuilder;
+
+    @Setup
+    public void setup() throws IOException {
+      requestBuilder =
+          DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString());
+      if (inspectTemplateName != null) {
+        requestBuilder.setInspectTemplateName(inspectTemplateName);
+      }
+      if (inspectConfig != null) {
+        requestBuilder.setInspectConfig(inspectConfig);
+      }
+      if (inspectConfig == null && inspectTemplateName == null) {
+        throw new IllegalArgumentException(
+            "Either inspectConfig or inspectTemplateName need to be set!");
+      }

Review comment:
       See my earlier comment about moving configuration exceptions like this to pipeline construction time. Doing so will give earlier feedback to the user and avoid having to spin up workers and processing part of the pipeline before running into a configuration issue.

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();
+    AtomicInteger bufferSize = new AtomicInteger();
+    List<Table.Row> rows = new ArrayList<>();
+    elementsBag
+        .read()
+        .forEach(
+            element -> {
+              int elementSize = element.getValue().getSerializedSize();
+              boolean clearBuffer = bufferSize.intValue() + elementSize > batchSize;
+              if (clearBuffer) {
+                numberOfRowsBagged.inc(rows.size());
+                LOG.debug("Clear Buffer {} , Key {}", bufferSize.intValue(), element.getKey());
+                output.output(KV.of(element.getKey(), rows));
+                rows.clear();
+                bufferSize.set(0);
+              }
+              rows.add(element.getValue());
+              bufferSize.getAndAdd(element.getValue().getSerializedSize());
+            });
+    if (!rows.isEmpty()) {
+      LOG.debug("Remaining rows {}", rows.size());

Review comment:
       Nit: This log message would be more clear if it said the action being taken like.. 'Outputting remaining {} rows'.

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings. The transform supports both CSV formatted input data and unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.

Review comment:
       An incorrect configuration should result in an exception at pipeline construction time if possible. The sooner an exception can be raised to the user the better (e.g. pipeline construction vs. pipeline runtime).
   
   A couple options are,
     1. Add precondition checks into the Builder for the AutoValue. You could do this by implementing your own `public DLPDeidentifyText build()` method the checks preconditions and then calls a generated `autoBuild` method. 
     2. Add precondition checks into the constructor of the DoFn.

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.privacy.dlp.v2.Table;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP
+ * service.
+ */
+@Experimental
+class BatchRequestForDLP extends DoFn<KV<String, Table.Row>, KV<String, Iterable<Table.Row>>> {
+  public static final Logger LOG = LoggerFactory.getLogger(BatchRequestForDLP.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForDLP.class, "numberOfRowsBagged");
+  private final Integer batchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, Table.Row>>> elementsBag = StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  public BatchRequestForDLP(Integer batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, Table.Row> element,
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());
+  }
+
+  @OnTimer("eventTimer")
+  public void onTimer(
+      @StateId("elementsBag") BagState<KV<String, Table.Row>> elementsBag,
+      OutputReceiver<KV<String, Iterable<Table.Row>>> output) {
+    String key = elementsBag.read().iterator().next().getKey();
+    AtomicInteger bufferSize = new AtomicInteger();
+    List<Table.Row> rows = new ArrayList<>();
+    elementsBag
+        .read()
+        .forEach(
+            element -> {
+              int elementSize = element.getValue().getSerializedSize();
+              boolean clearBuffer = bufferSize.intValue() + elementSize > batchSize;
+              if (clearBuffer) {
+                numberOfRowsBagged.inc(rows.size());
+                LOG.debug("Clear Buffer {} , Key {}", bufferSize.intValue(), element.getKey());

Review comment:
       Nit: Can you move this to the first line of the if block to avoid splitting the code like this? Also it would be nice to have the unit in the log message for the bufferSize (e.g bytes).

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings. The transform supports both CSV formatted input data and unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();
+
+  @Nullable
+  public abstract String deidentifyTemplateName();
+
+  @Nullable
+  public abstract InspectConfig inspectConfig();
+
+  @Nullable
+  public abstract DeidentifyConfig deidentifyConfig();
+
+  @Nullable
+  public abstract PCollectionView<List<String>> csvHeader();
+
+  @Nullable
+  public abstract String csvDelimiter();

Review comment:
       Since the delimiter is configurable, what about dropping the 'csv'? You could add details in the comments as to where the delimiter applies and the default value.

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.ReidentifyContentRequest;
+import com.google.privacy.dlp.v2.ReidentifyContentResponse;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according
+ * to provided settings.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the
+ * same goes for reidentifyTemplateName or reidentifyConfig.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPReidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, ReidentifyContentResponse>>> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class);
+
+  public static final Integer DLP_PAYLOAD_LIMIT = 52400;

Review comment:
       Having the units in the name is helpful when the type is non-descriptive. Where did this number come from?

##########
File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.dlp.v2.DlpServiceClient;
+import com.google.privacy.dlp.v2.ContentItem;
+import com.google.privacy.dlp.v2.DeidentifyConfig;
+import com.google.privacy.dlp.v2.DeidentifyContentRequest;
+import com.google.privacy.dlp.v2.DeidentifyContentResponse;
+import com.google.privacy.dlp.v2.FieldId;
+import com.google.privacy.dlp.v2.InspectConfig;
+import com.google.privacy.dlp.v2.ProjectName;
+import com.google.privacy.dlp.v2.Table;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided
+ * settings. The transform supports both CSV formatted input data and unstructured input.
+ *
+ * <p>If the csvHeader property is set, csvDelimiter also should be, else the results will be
+ * incorrect. If csvHeader is not set, input is assumed to be unstructured.
+ *
+ * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The
+ * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}.
+ *
+ * <p>Batch size defines how big are batches sent to DLP at once in bytes.
+ *
+ * <p>The transform outputs {@link KV} of {@link String} (eg. filename) and {@link
+ * DeidentifyContentResponse}, which will contain {@link Table} of results for the user to consume.
+ */
+@Experimental
+@AutoValue
+public abstract class DLPDeidentifyText
+    extends PTransform<
+        PCollection<KV<String, String>>, PCollection<KV<String, DeidentifyContentResponse>>> {
+
+  @Nullable
+  public abstract String inspectTemplateName();

Review comment:
       It would be helpful if these abstract methods, and possibly those in the builder, had comments.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org