You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/16 15:52:46 UTC

[GitHub] [beam] KhaninArtur opened a new pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

KhaninArtur opened a new pull request #13995:
URL: https://github.com/apache/beam/pull/13995


   Some users may want to protect their sensitive data using tokenization.
   
   We propose to create a Beam example template that will demonstrate Beam transform to protect sensitive data using tokenization. In our example, we use an external service for the data tokenization. 
   
   At a high level, a pipeline that: 
   
   - supports batch (GCS) and streaming (Pub/Sub) input sources
   - tokenizes sensitive data via external REST service - we are about to use Protegrity
   - outputs tokenized data into BigQuery or BigTable
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [x] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-786266997


   hi @KhaninArtur ! thanks for your contribution - is this PR ready to be reviewed?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-792804891


   Hi, @pabloem!
   Are there any updates regarding this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r598693312



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.util.RowJsonUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonArray;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DataProtectors} Using passing parameters transform will buffer input rows in batch and
+ * will send it when the count of buffered rows will equal specified batch size. When it takes the
+ * last one batch, it will send it when the last row will come to doFn even count of buffered rows
+ * will less than the batch size.
+ */
+public class DataProtectors {
+
+  /** Logger for class. */
+  private static final Logger LOG = LoggerFactory.getLogger(DataProtectors.class);
+
+  public static final String ID_FIELD_NAME = "ID";
+
+  /**
+   * The {@link RowToTokenizedRow} transform converts {@link Row} to {@link TableRow} objects. The
+   * transform accepts a {@link FailsafeElement} object so the original payload of the incoming
+   * record can be maintained across multiple series of transforms.
+   */
+  @AutoValue
+  public abstract static class RowToTokenizedRow<T>
+      extends PTransform<PCollection<KV<Integer, Row>>, PCollectionTuple> {
+
+    public static <T> Builder<T> newBuilder() {
+      return new AutoValue_DataProtectors_RowToTokenizedRow.Builder<>();
+    }
+
+    public abstract TupleTag<Row> successTag();
+
+    public abstract TupleTag<FailsafeElement<Row, Row>> failureTag();
+
+    public abstract Schema schema();
+
+    public abstract int batchSize();
+
+    public abstract String rpcURI();
+
+    @Override
+    public PCollectionTuple expand(PCollection<KV<Integer, Row>> inputRows) {
+      FailsafeElementCoder<Row, Row> coder =
+          FailsafeElementCoder.of(RowCoder.of(schema()), RowCoder.of(schema()));
+      PCollectionTuple pCollectionTuple =
+          inputRows.apply(
+              "Tokenize",
+              ParDo.of(new TokenizationFn(schema(), batchSize(), rpcURI(), failureTag()))
+                  .withOutputTags(successTag(), TupleTagList.of(failureTag())));
+      return PCollectionTuple.of(
+              successTag(), pCollectionTuple.get(successTag()).setRowSchema(schema()))
+          .and(failureTag(), pCollectionTuple.get(failureTag()).setCoder(coder));
+    }
+
+    /** Builder for {@link RowToTokenizedRow}. */
+    @AutoValue.Builder
+    public abstract static class Builder<T> {
+
+      public abstract Builder<T> setSuccessTag(TupleTag<Row> successTag);
+
+      public abstract Builder<T> setFailureTag(TupleTag<FailsafeElement<Row, Row>> failureTag);
+
+      public abstract Builder<T> setSchema(Schema schema);
+
+      public abstract Builder<T> setBatchSize(int batchSize);
+
+      public abstract Builder<T> setRpcURI(String rpcURI);
+
+      public abstract RowToTokenizedRow<T> build();
+    }
+  }
+
+  /** Class implements stateful doFn for data tokenization using remote RPC. */
+  @SuppressWarnings("initialization.static.fields.uninitialized")
+  public static class TokenizationFn extends DoFn<KV<Integer, Row>, Row> {
+
+    private static Schema schemaToRpc;
+    private static CloseableHttpClient httpclient;
+    private static ObjectMapper objectMapperSerializerForSchema;
+    private static ObjectMapper objectMapperDeserializerForSchema;
+
+    private final Schema schema;
+    private final int batchSize;
+    private final String rpcURI;
+    private final TupleTag<FailsafeElement<Row, Row>> failureTag;
+
+    @StateId("buffer")
+    private final StateSpec<BagState<Row>> bufferedEvents;
+
+    @StateId("count")
+    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
+
+    @TimerId("expiry")
+    private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private Map<String, Row> inputRowsWithIds;
+
+    public TokenizationFn(
+        Schema schema,
+        int batchSize,
+        String rpcURI,
+        TupleTag<FailsafeElement<Row, Row>> failureTag) {
+      this.schema = schema;
+      this.batchSize = batchSize;
+      this.rpcURI = rpcURI;
+      bufferedEvents = StateSpecs.bag(RowCoder.of(schema));
+      this.failureTag = failureTag;
+      this.inputRowsWithIds = new HashMap<>();
+    }
+
+    @Setup
+    public void setup() {
+
+      List<Field> fields = schema.getFields();
+      fields.add(Field.of(ID_FIELD_NAME, FieldType.STRING));
+      schemaToRpc = new Schema(fields);
+
+      objectMapperSerializerForSchema =
+          RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schemaToRpc));
+
+      objectMapperDeserializerForSchema =
+          RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schemaToRpc));
+
+      httpclient = HttpClients.createDefault();
+    }
+
+    @Teardown
+    public void close() {
+      try {
+        httpclient.close();
+      } catch (IOException exception) {
+        String exceptionMessage = exception.getMessage();
+        if (exceptionMessage != null) {
+          LOG.warn("Can't close connection: {}", exceptionMessage);
+        }
+      }
+    }
+
+    @OnTimer("expiry")
+    public void onExpiry(OnTimerContext context, @StateId("buffer") BagState<Row> bufferState) {
+      boolean isEmpty = firstNonNull(bufferState.isEmpty().read(), true);
+      if (!isEmpty) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        BoundedWindow window,
+        @StateId("buffer") BagState<Row> bufferState,
+        @StateId("count") ValueState<Integer> countState,
+        @TimerId("expiry") Timer expiryTimer) {
+
+      expiryTimer.set(window.maxTimestamp());
+
+      int count = firstNonNull(countState.read(), 0);
+      count++;
+      countState.write(count);
+      bufferState.add(context.element().getValue());
+
+      if (count >= batchSize) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+        countState.clear();
+      }
+    }
+
+    @SuppressWarnings("argument.type.incompatible")
+    private void processBufferedRows(Iterable<Row> rows, WindowedContext context) {

Review comment:
       Thank you so much for your suggestion, @pabloem! GroupIntoBatches works really great, replaced Stateful DoFn with it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r594975541



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+    - **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery.
+- 1 specified input source out of these:
+    - File System
+        - **inputFilePattern**: Filepattern for files to read data from
+        - **inputFileFormat**: File format of input files. Supported formats: JSON, CSV
+        - In case if input data is in CSV format:
+            - **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers,
+              and `false` otherwise
+            - **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in
+              csvFormat
+            - **csvFormat**: Csv format according to Apache Commons CSV format. Default is:
+              [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+              . Must match format names exactly found
+              at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+    - Google Pub/Sub
+        - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of '
+          projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+    - File System
+        - **outputDirectory**: Directory to write data to
+        - **outputFileFormat**: File format of output files. Supported formats: JSON, CSV
+        - **windowDuration**: The window duration in which data will be written. Should be specified
+          only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+          Allowed formats are:
+            - Ns (for seconds, example: 5s),
+            - Nm (for minutes, example: 12m),
+            - Nh (for hours, example: 2h).
+    - Google Cloud BigQuery
+        - **bigQueryTableName**: Cloud BigQuery table name to write into
+        - **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for
+          BigQuery to handle data writing
+    - Cloud BigTable
+        - **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into
+          is located
+        - **bigTableInstanceId**: Id of the Cloud BigTable instance to write into
+        - **bigTableTableId**: Id of the Cloud BigTable table to write into
+        - **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable
+        - **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable
+- RPC server parameters
+    - **rpcUri**: URI for the API calls to RPC server
+    - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored
+
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
+this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for

Review comment:
       Yes, we have [a PR](https://github.com/GoogleCloudPlatform/DataflowTemplates/pull/210) in the DataflowTemplates repository and it is focused on execution via Google Cloud Dataflow runner. The Beam version, on the other hand, is a bit more generic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r589778288



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+

Review comment:
       these instructions are nice and useful. I worry that users will not find out about this example. Do you have plans to blog about it, or add any extra documentation for it?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/DataProtectors.java
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms;
+
+import static org.apache.beam.sdk.util.RowJsonUtils.rowToJson;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.sdk.coders.RowCoder;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.Schema.Field;
+import org.apache.beam.sdk.schemas.Schema.FieldType;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.RowJson;
+import org.apache.beam.sdk.util.RowJsonUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonArray;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.JsonObject;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The {@link DataProtectors} Using passing parameters transform will buffer input rows in batch and
+ * will send it when the count of buffered rows will equal specified batch size. When it takes the
+ * last one batch, it will send it when the last row will come to doFn even count of buffered rows
+ * will less than the batch size.
+ */
+public class DataProtectors {
+
+  /** Logger for class. */
+  private static final Logger LOG = LoggerFactory.getLogger(DataProtectors.class);
+
+  public static final String ID_FIELD_NAME = "ID";
+
+  /**
+   * The {@link RowToTokenizedRow} transform converts {@link Row} to {@link TableRow} objects. The
+   * transform accepts a {@link FailsafeElement} object so the original payload of the incoming
+   * record can be maintained across multiple series of transforms.
+   */
+  @AutoValue
+  public abstract static class RowToTokenizedRow<T>
+      extends PTransform<PCollection<KV<Integer, Row>>, PCollectionTuple> {
+
+    public static <T> Builder<T> newBuilder() {
+      return new AutoValue_DataProtectors_RowToTokenizedRow.Builder<>();
+    }
+
+    public abstract TupleTag<Row> successTag();
+
+    public abstract TupleTag<FailsafeElement<Row, Row>> failureTag();
+
+    public abstract Schema schema();
+
+    public abstract int batchSize();
+
+    public abstract String rpcURI();
+
+    @Override
+    public PCollectionTuple expand(PCollection<KV<Integer, Row>> inputRows) {
+      FailsafeElementCoder<Row, Row> coder =
+          FailsafeElementCoder.of(RowCoder.of(schema()), RowCoder.of(schema()));
+      PCollectionTuple pCollectionTuple =
+          inputRows.apply(
+              "Tokenize",
+              ParDo.of(new TokenizationFn(schema(), batchSize(), rpcURI(), failureTag()))
+                  .withOutputTags(successTag(), TupleTagList.of(failureTag())));
+      return PCollectionTuple.of(
+              successTag(), pCollectionTuple.get(successTag()).setRowSchema(schema()))
+          .and(failureTag(), pCollectionTuple.get(failureTag()).setCoder(coder));
+    }
+
+    /** Builder for {@link RowToTokenizedRow}. */
+    @AutoValue.Builder
+    public abstract static class Builder<T> {
+
+      public abstract Builder<T> setSuccessTag(TupleTag<Row> successTag);
+
+      public abstract Builder<T> setFailureTag(TupleTag<FailsafeElement<Row, Row>> failureTag);
+
+      public abstract Builder<T> setSchema(Schema schema);
+
+      public abstract Builder<T> setBatchSize(int batchSize);
+
+      public abstract Builder<T> setRpcURI(String rpcURI);
+
+      public abstract RowToTokenizedRow<T> build();
+    }
+  }
+
+  /** Class implements stateful doFn for data tokenization using remote RPC. */
+  @SuppressWarnings("initialization.static.fields.uninitialized")
+  public static class TokenizationFn extends DoFn<KV<Integer, Row>, Row> {
+
+    private static Schema schemaToRpc;
+    private static CloseableHttpClient httpclient;
+    private static ObjectMapper objectMapperSerializerForSchema;
+    private static ObjectMapper objectMapperDeserializerForSchema;
+
+    private final Schema schema;
+    private final int batchSize;
+    private final String rpcURI;
+    private final TupleTag<FailsafeElement<Row, Row>> failureTag;
+
+    @StateId("buffer")
+    private final StateSpec<BagState<Row>> bufferedEvents;
+
+    @StateId("count")
+    private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();
+
+    @TimerId("expiry")
+    private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+    private Map<String, Row> inputRowsWithIds;
+
+    public TokenizationFn(
+        Schema schema,
+        int batchSize,
+        String rpcURI,
+        TupleTag<FailsafeElement<Row, Row>> failureTag) {
+      this.schema = schema;
+      this.batchSize = batchSize;
+      this.rpcURI = rpcURI;
+      bufferedEvents = StateSpecs.bag(RowCoder.of(schema));
+      this.failureTag = failureTag;
+      this.inputRowsWithIds = new HashMap<>();
+    }
+
+    @Setup
+    public void setup() {
+
+      List<Field> fields = schema.getFields();
+      fields.add(Field.of(ID_FIELD_NAME, FieldType.STRING));
+      schemaToRpc = new Schema(fields);
+
+      objectMapperSerializerForSchema =
+          RowJsonUtils.newObjectMapperWith(RowJson.RowJsonSerializer.forSchema(schemaToRpc));
+
+      objectMapperDeserializerForSchema =
+          RowJsonUtils.newObjectMapperWith(RowJson.RowJsonDeserializer.forSchema(schemaToRpc));
+
+      httpclient = HttpClients.createDefault();
+    }
+
+    @Teardown
+    public void close() {
+      try {
+        httpclient.close();
+      } catch (IOException exception) {
+        String exceptionMessage = exception.getMessage();
+        if (exceptionMessage != null) {
+          LOG.warn("Can't close connection: {}", exceptionMessage);
+        }
+      }
+    }
+
+    @OnTimer("expiry")
+    public void onExpiry(OnTimerContext context, @StateId("buffer") BagState<Row> bufferState) {
+      boolean isEmpty = firstNonNull(bufferState.isEmpty().read(), true);
+      if (!isEmpty) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+      }
+    }
+
+    @ProcessElement
+    public void process(
+        ProcessContext context,
+        BoundedWindow window,
+        @StateId("buffer") BagState<Row> bufferState,
+        @StateId("count") ValueState<Integer> countState,
+        @TimerId("expiry") Timer expiryTimer) {
+
+      expiryTimer.set(window.maxTimestamp());
+
+      int count = firstNonNull(countState.read(), 0);
+      count++;
+      countState.write(count);
+      bufferState.add(context.element().getValue());
+
+      if (count >= batchSize) {
+        processBufferedRows(bufferState.read(), context);
+        bufferState.clear();
+        countState.clear();
+      }
+    }
+
+    @SuppressWarnings("argument.type.incompatible")
+    private void processBufferedRows(Iterable<Row> rows, WindowedContext context) {

Review comment:
       I see that this DoFn does a lot of its own buffering. Have you considered using GroupIntoBatches[1] for this? GroupIntoBatches has the same sort of buffering/counting/timer emission logic, but it receives more updates (e.g. it recently started supporting 'autosharding' which lets runners decouple the number of shards from the number of keys).
   
   Think about it - but I recommend you try using GroupIntoBatches, as you will get some extra nice benefits from it.
   
   [1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/GroupIntoBatches.html

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/FileSystemIO.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.examples.complete.datatokenization.utils.CsvConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link FileSystemIO} class to read/write data from/into File Systems. */
+public class FileSystemIO {

Review comment:
       is this class meant to be generic? Or specific for this template? I see the class is within the template package - I am just wondering if we should name the class TokenizationFileIO or something that states clearly that these transforms are meant only to be used for the data tokenization template?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigQueryIO.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigQueryIO} class for writing data from template to BigTable. */
+public class BigQueryIO {

Review comment:
       Same as FileSystemIO

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+    - **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery.
+- 1 specified input source out of these:
+    - File System
+        - **inputFilePattern**: Filepattern for files to read data from
+        - **inputFileFormat**: File format of input files. Supported formats: JSON, CSV
+        - In case if input data is in CSV format:
+            - **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers,
+              and `false` otherwise
+            - **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in
+              csvFormat
+            - **csvFormat**: Csv format according to Apache Commons CSV format. Default is:
+              [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+              . Must match format names exactly found
+              at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+    - Google Pub/Sub
+        - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of '
+          projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+    - File System
+        - **outputDirectory**: Directory to write data to
+        - **outputFileFormat**: File format of output files. Supported formats: JSON, CSV
+        - **windowDuration**: The window duration in which data will be written. Should be specified
+          only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+          Allowed formats are:
+            - Ns (for seconds, example: 5s),
+            - Nm (for minutes, example: 12m),
+            - Nh (for hours, example: 2h).
+    - Google Cloud BigQuery
+        - **bigQueryTableName**: Cloud BigQuery table name to write into
+        - **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for
+          BigQuery to handle data writing
+    - Cloud BigTable
+        - **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into
+          is located
+        - **bigTableInstanceId**: Id of the Cloud BigTable instance to write into
+        - **bigTableTableId**: Id of the Cloud BigTable table to write into
+        - **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable
+        - **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable
+- RPC server parameters
+    - **rpcUri**: URI for the API calls to RPC server
+    - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored
+
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
+this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for

Review comment:
       I don't see anything in this address. Will it be added later?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+
+## Running the pipeline
+
+To execute this pipeline, specify the parameters:
+
+- Data schema
+    - **dataSchemaPath**: Path to data schema (JSON format) compatible with BigQuery.
+- 1 specified input source out of these:
+    - File System
+        - **inputFilePattern**: Filepattern for files to read data from
+        - **inputFileFormat**: File format of input files. Supported formats: JSON, CSV
+        - In case if input data is in CSV format:
+            - **csvContainsHeaders**: `true` if file(s) in bucket to read data from contain headers,
+              and `false` otherwise
+            - **csvDelimiter**: Delimiting character in CSV. Default: use delimiter provided in
+              csvFormat
+            - **csvFormat**: Csv format according to Apache Commons CSV format. Default is:
+              [Apache Commons CSV default](https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.html#DEFAULT)
+              . Must match format names exactly found
+              at: https://static.javadoc.io/org.apache.commons/commons-csv/1.7/org/apache/commons/csv/CSVFormat.Predefined.html
+    - Google Pub/Sub
+        - **pubsubTopic**: The Cloud Pub/Sub topic to read from, in the format of '
+          projects/yourproject/topics/yourtopic'
+- 1 specified output sink out of these:
+    - File System
+        - **outputDirectory**: Directory to write data to
+        - **outputFileFormat**: File format of output files. Supported formats: JSON, CSV
+        - **windowDuration**: The window duration in which data will be written. Should be specified
+          only for 'Pub/Sub -> GCS' case. Defaults to 30s.
+
+          Allowed formats are:
+            - Ns (for seconds, example: 5s),
+            - Nm (for minutes, example: 12m),
+            - Nh (for hours, example: 2h).
+    - Google Cloud BigQuery
+        - **bigQueryTableName**: Cloud BigQuery table name to write into
+        - **tempLocation**: Folder in a Google Cloud Storage bucket, which is needed for
+          BigQuery to handle data writing
+    - Cloud BigTable
+        - **bigTableProjectId**: Id of the project where the Cloud BigTable instance to write into
+          is located
+        - **bigTableInstanceId**: Id of the Cloud BigTable instance to write into
+        - **bigTableTableId**: Id of the Cloud BigTable table to write into
+        - **bigTableKeyColumnName**: Column name to use as a key in Cloud BigTable
+        - **bigTableColumnFamilyName**: Column family name to use in Cloud BigTable
+- RPC server parameters
+    - **rpcUri**: URI for the API calls to RPC server
+    - **batchSize**: Size of the batch to send to RPC server per request
+
+The template allows for the user to supply the following optional parameter:
+
+- **nonTokenizedDeadLetterPath**: Folder where failed to tokenize data will be stored
+
+
+in the following format:
+
+```bash
+--dataSchemaPath="path-to-data-schema-in-json-format"
+--inputFilePattern="path-pattern-to-input-data"
+--outputDirectory="path-to-output-directory"
+# example for CSV case
+--inputFileFormat="CSV"
+--outputFileFormat="CSV"
+--csvContainsHeaders="true"
+--nonTokenizedDeadLetterPath="path-to-errors-rows-writing"
+--batchSize=batch-size-number
+--rpcUri=http://host:port/tokenize
+```
+
+By default, this will run the pipeline locally with the DirectRunner. To change the runner, specify:
+
+```bash
+--runner=YOUR_SELECTED_RUNNER
+```
+
+See the [documentation](http://beam.apache.org/get-started/quickstart/) and
+the [Examples README](../../../../../../../../../README.md) for more information about how to run this example.
+
+## Running as a Dataflow Template
+
+This example also exists as Google Dataflow Template, which you can build and run using Google Cloud Platform. See
+this template documentation [README.md](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/protegrity-data-tokenization/README.md) for

Review comment:
       I am curious how the template here and in DataflowTemplates will be different?

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigTableIO.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteResult;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigTableIO} class for writing data from template to BigTable. */
+public class BigTableIO {

Review comment:
       Same question as with FileSystemIO




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-786488421


   Hi @pabloem! Yes, this PR is ready to be reviewed 👍🏼 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #13995:
URL: https://github.com/apache/beam/pull/13995


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r598691216



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/FileSystemIO.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.examples.complete.datatokenization.utils.CsvConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.ErrorConverters;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElementCoder;
+import org.apache.beam.examples.complete.datatokenization.utils.RowToCsv;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link FileSystemIO} class to read/write data from/into File Systems. */
+public class FileSystemIO {

Review comment:
       Got you, renamed it.

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigTableIO.java
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.bigtable.v2.Mutation;
+import com.google.protobuf.ByteString;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.beam.examples.complete.datatokenization.options.DataTokenizationOptions;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
+import org.apache.beam.sdk.io.gcp.bigtable.BigtableWriteResult;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.Row;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigTableIO} class for writing data from template to BigTable. */
+public class BigTableIO {

Review comment:
       Done

##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/transforms/io/BigQueryIO.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.transforms.io;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import org.apache.beam.examples.complete.datatokenization.utils.FailsafeElement;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
+import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** The {@link BigQueryIO} class for writing data from template to BigTable. */
+public class BigQueryIO {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-803977659


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on a change in pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on a change in pull request #13995:
URL: https://github.com/apache/beam/pull/13995#discussion_r594966044



##########
File path: examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/README.md
##########
@@ -0,0 +1,169 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Apache Beam pipeline example to tokenize data using remote RPC server
+
+This directory contains an Apache Beam example that creates a pipeline to read data from one of
+the supported sources, tokenize data with external API calls to remote RPC server, and write data into one of the supported sinks.
+
+Supported data formats:
+
+- JSON
+- CSV
+
+Supported input sources:
+
+- File system
+- [Google Pub/Sub](https://cloud.google.com/pubsub)
+
+Supported destination sinks:
+
+- File system
+- [Google Cloud BigQuery](https://cloud.google.com/bigquery)
+- [Cloud BigTable](https://cloud.google.com/bigtable)
+
+Supported data schema format:
+
+- JSON with an array of fields described in BigQuery format
+
+In the main scenario, the template will create an Apache Beam pipeline that will read data in CSV or
+JSON format from a specified input source, send the data to an external processing server, receive
+processed data, and write it into a specified output sink.
+
+## Requirements
+
+- Java 8
+- 1 of supported sources to read data from
+- 1 of supported destination sinks to write data into
+- A configured RPC to tokenize data
+
+## Getting Started
+
+This section describes what is needed to get the template up and running.
+
+- Gradle preparation
+- Local execution
+- Running as a Dataflow Template
+    - Setting Up Project Environment
+    - Build Data Tokenization Dataflow Flex Template
+    - Creating the Dataflow Flex Template
+    - Executing Template
+
+## Gradle preparation
+
+To run this example your `build.gradle` file should contain the following task to execute the pipeline:
+
+```
+task execute (type:JavaExec) {
+    main = System.getProperty("mainClass")
+    classpath = sourceSets.main.runtimeClasspath
+    systemProperties System.getProperties()
+    args System.getProperty("exec.args", "").split()
+}
+```
+
+This task allows to run the pipeline via the following command:
+
+```bash
+gradle clean execute -DmainClass=org.apache.beam.examples.complete.datatokenization.DataTokenization \
+     -Dexec.args="--<argument>=<value> --<argument>=<value>"
+```
+

Review comment:
       Yes, we plan to spread the word and blog about it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-804241539


   taking a look today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KhaninArtur commented on pull request #13995: [BEAM-11322] Apache Beam Example to tokenize sensitive data

Posted by GitBox <gi...@apache.org>.
KhaninArtur commented on pull request #13995:
URL: https://github.com/apache/beam/pull/13995#issuecomment-803946707


   retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org