You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/19 18:26:13 UTC

[GitHub] [beam] Aliraza-N opened a new pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Aliraza-N opened a new pull request #13137:
URL: https://github.com/apache/beam/pull/13137


   We are looking to create a new Java Pipeline Connector to help facilitate Reading data from the Imaging API in GCP. Initially, this connector will be used to read study level metadata of Dicom instances. 
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] poojavenkatram commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
poojavenkatram commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r509433845



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {
+
+  /**
+   * Read dicom study metadata from a PCollection of resource webpath.
+   *
+   * @return the read

Review comment:
       DICOM study metadata?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {
+
+  /**

Review comment:
       Why not move this comment into the Class to show that the class does...this method is only returning an Object of the class..

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {

Review comment:
       Right on the top of this page I would add some information on what this connector is and what it is doing. Somthing like this is used to connect Google Cloud Healthcare DICOM API with links to the API, how to configure pubsub, etc. to make it easy for a customer of the connector to get some context.

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOTest.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DicomIOTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void test_Dicom_failedMetadataRead() {

Review comment:
       How about a test for the success case?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {
+
+  /**
+   * Read dicom study metadata from a PCollection of resource webpath.

Review comment:
       Read the DICOM study metadata from the resource path obtained from PubSub ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
##########
@@ -196,6 +197,64 @@ public Empty deleteFhirStore(String name) throws IOException {
     return client.projects().locations().datasets().fhirStores().delete(name).execute();
   }
 
+  @Override
+  public String retrieveDicomStudyMetadata(String dicomWebPath) throws IOException {
+    String[] webPathSplit;
+    webPathSplit = dicomWebPath.split("/dicomWeb/");
+    String dicomStorePath = webPathSplit[0];
+
+    String[] searchParameters;
+    searchParameters = webPathSplit[1].split("/");
+    String studyId = searchParameters[1];
+    //        String seriesId = searchParameters[3];
+    //        String instanceId = searchParameters[5];

Review comment:
       nit: remove

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {
+
+  /**
+   * Read dicom study metadata from a PCollection of resource webpath.
+   *
+   * @return the read
+   * @see ReadDicomStudyMetadata
+   */
+  public static ReadDicomStudyMetadata retrieveStudyMetadata() {
+    return new ReadDicomStudyMetadata();
+  }
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, PCollection<String>> {
+
+    public ReadDicomStudyMetadata() {}
+
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    static class FetchStudyMetadataFn extends DoFn<PubsubMessage, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.dicomStore = new HttpHealthcareApiClient();
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PubsubMessage msg = context.element();
+        byte[] msgPayload = msg.getPayload();
+        try {
+          String dicomWebPath = new String(msgPayload, "UTF-8");
+          String responseData = dicomStore.retrieveDicomStudyMetadata(dicomWebPath);
+          context.output(responseData);
+        } catch (Exception e) {
+          // IO exception, unsupported encoding exception
+          System.out.println(e);
+          if (e.getClass() == IOException.class) {
+          } else if (e.getClass() == UnsupportedEncodingException.class) {
+          } else {
+          }
+        }

Review comment:
       this seems incomplete?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N edited a comment on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N edited a comment on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-736086841


   > I think this broke Java Postcommit: https://ci-beam.apache.org/job/beam_PostCommit_Java/6904/
   > 
   > ```
   > java.nio.file.NoSuchFileException: src/test/resources/DICOM/testDicomFile.dcm
   > 	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   > 	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   > 	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   > 	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
   > 	at java.nio.file.Files.newByteChannel(Files.java:361)
   > 	at java.nio.file.Files.newByteChannel(Files.java:407)
   > 	at java.nio.file.Files.readAllBytes(Files.java:3152)
   > 	at org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.uploadToDicomStore(HttpHealthcareApiClient.java:229)
   > 	at org.apache.beam.sdk.io.gcp.healthcare.DicomIOReadIT.setup(DicomIOReadIT.java:55)
   > 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   > 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   > 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   > ```
   
   Working on a fix. In the meantime, I'll add the \@ Ignore tag


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-736096808


   This PR has the change needed to ignore the test. At first glance, it appears that the test input file is not available during the run.
   https://github.com/apache/beam/pull/13440


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N edited a comment on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N edited a comment on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-727005597


   All done!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N closed pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N closed pull request #13137:
URL: https://github.com/apache/beam/pull/13137


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-725753709






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N edited a comment on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N edited a comment on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-727005597


   All done! @pabloem 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r512071470



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API from Google Cloud
+ * Healthcare. https://cloud.google.com/healthcare/docs/concepts/dicom
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    /**
+     * This class makes a call to the retrieve metadata endpoint
+     * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata It is
+     * expecting a PubSub message as input, where the message's body will contain the path to the
+     * study. You can learn how to configure PubSub messages to be published when an instance is
+     * stored by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector
+     * will output a {@link ReadDicomStudyMetadata.Result} which will contain metadata of the study
+     * encoded as a json array.
+     */
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result form PCollectionTuple with OUT and DEAD_LETTER
+       * tags.
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static DicomIO.ReadDicomStudyMetadata.Result of(PCollectionTuple pct)
+          throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new DicomIO.ReadDicomStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadDicomStudyMetadata.OUT "

Review comment:
       Need to update error message




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r510437467



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOTest.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DicomIOTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void test_Dicom_failedMetadataRead() {
+    PubsubMessage badMessage = createPubSubMessage("foo");
+
+    DicomIO.ReadDicomStudyMetadata.Result retrievedData;
+    retrievedData =
+        pipeline.apply(Create.of(badMessage)).apply(new DicomIO.ReadDicomStudyMetadata());
+
+    PAssert.that(retrievedData.getReadResponse()).empty();

Review comment:
       Yes, you can call `retrievedData.getFailedReads()` to get the error responses. But that PCollection will not be empty.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-727005597


   All done


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-723097687


   I can't figure out how to reply directly to your comment above, but yes, I did start working on an integration test. Just not part of the prev commit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r522332059



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadStudyMetadata.Result}. You can fetch the successful calls using getReadResponse(), and
+ * any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadStudyMetadata.Result readMetadataResult
+ * = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
+ */
+public class DicomIO {
+
+  public static ReadStudyMetadata readStudyMetadata() {
+    return new ReadStudyMetadata();
+  }
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadStudyMetadata.Result} which will contain metadata of the study encoded as a json array.
+   */
+  public static class ReadStudyMetadata
+      extends PTransform<PCollection<String>, ReadStudyMetadata.Result> {
+
+    private ReadStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadStudyMetadata.Result from PCollectionTuple which contains the response
+       * (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static ReadStudyMetadata.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new ReadStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadStudyMetadata.METADATA "
+                  + "and DicomIO.ReadStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    @SuppressWarnings({"nullness", "rawtypes"})
+    static class FetchStudyMetadataFn extends DoFn<String, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        if (dicomStore == null) {
+          this.dicomStore = new HttpHealthcareApiClient();
+        }
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        String dicomWebPath = context.element();
+        try {
+          // TODO Change to non-blocking async calls

Review comment:
       Maybe - can you create a JIRA issue, and add `// TODO(BEAM-XXXX) Change to non-blocking async calls` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] suztomo commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
suztomo commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-813596393


   @Aliraza-N The DiacomIOTest and other gcp healthcare service tests require default application credentials. 
   
   > There are additional tests that suffer from this. They appear to be GCP integration tests, but have Test in the class name and so run under the :test task:
   > DicomIOTest. test_Dicom_failedMetadataRead
   > FhirIOTest. test_FhirIO_failedReads
   > FhirIOTest. test_FhirIO_failedWrites
   > HL7v2IOTest. test_HL7v2IO_failedReads
   > HL7v2IOTest. test_HL7v2IO_failedWrites
   
   https://issues.apache.org/jira/browse/BEAM-11363?focusedCommentId=17242773&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17242773
   
   Do you think they should be classified as integration tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-736089962


   Thank you!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-712362610


   R: @poojavenkatram
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726223246


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726173789


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r522331585



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOReadIT.java
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings({"nullness", "rawtypes", "uninitialized"})
+public class DicomIOReadIT {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private String healthcareDataset;
+  private String project;
+  private HealthcareApiClient client;
+  private String storeName = "foo";
+
+  @Before
+  public void setup() throws IOException {
+    project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    client = new HttpHealthcareApiClient();
+
+    client.createDicomStore(healthcareDataset, storeName);
+  }
+
+  @After
+  public void deleteDicomStore() throws IOException {
+    client.deleteDicomStore(healthcareDataset + "/dicomStores/" + storeName);
+  }
+
+  @Test
+  public void testDicomMetadataRead() throws IOException {
+    String webPath =
+        String.format("%s/dicomStores/%s/dicomWeb/studies/", healthcareDataset, storeName);
+
+    DicomIO.ReadStudyMetadata.Result result =
+        pipeline.apply(Create.of(webPath)).apply(DicomIO.readStudyMetadata());

Review comment:
       can you add a PAssert verification here? To verify that the expected message shows up?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N removed a comment on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N removed a comment on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-724237629


   The tests are failing because of this in the build process:
   
   > Task :sdks:java:io:google-cloud-platform:compileJava
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:149: error: [initialization.fields.uninitialized] the constructor does not initialize fields: dicomStore
       static class FetchStudyMetadataFn extends DoFn<String, String> {
              ^
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:176: error: [argument.type.incompatible] incompatible argument for parameter output of output.
             context.output(ERROR_MESSAGE, e.getMessage());
                                                       ^
     found   : @Initialized @Nullable String
     required: @Initialized @NonNull String
   
   Neither seems to be a problem locally. The code in this context looks very similar to the FhirIO code, so not sure why it's happening. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] poojavenkatram commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
poojavenkatram commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r510404106



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud

Review comment:
       add a link to the Dicom API here..

##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOTest.java
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DicomIOTest {
+  @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  public void test_Dicom_failedMetadataRead() {
+    PubsubMessage badMessage = createPubSubMessage("foo");
+
+    DicomIO.ReadDicomStudyMetadata.Result retrievedData;
+    retrievedData =
+        pipeline.apply(Create.of(badMessage)).apply(new DicomIO.ReadDicomStudyMetadata());
+
+    PAssert.that(retrievedData.getReadResponse()).empty();

Review comment:
       you can also check for the error in the result now?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json
+ * string
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};
+    public static final TupleTag<String> DEAD_LETTER = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+      /** The Pct. */

Review comment:
       please be more descriptive in the comment

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json
+ * string
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};

Review comment:
       why not give these both better names like "Response" and "Error"?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.

Review comment:
       nit: read from the Google Cloud Healthcare DICOM API. ?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json

Review comment:
       study encoded as a  json string?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json
+ * string
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */

Review comment:
       You can move the Description you gave for ReadStudyMetadata above here because we might add other methods to the same connector in the future..

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a

Review comment:
       DICOM study metadata can be read?

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can

Review comment:
       nit: contain the path to the study. 

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store

Review comment:
       when a DICOM instance is stored by following..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-721362998


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r518394131



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result from PCollectionTuple which contains the
+       * response (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static DicomIO.ReadDicomStudyMetadata.Result of(PCollectionTuple pct)
+          throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new DicomIO.ReadDicomStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadDicomStudyMetadata.METADATA "
+                  + "and DicomIO.ReadDicomStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    static class FetchStudyMetadataFn extends DoFn<PubsubMessage, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.dicomStore = new HttpHealthcareApiClient();
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PubsubMessage msg = context.element();
+        byte[] msgPayload = msg.getPayload();
+        try {
+          String dicomWebPath = new String(msgPayload, "UTF-8");
+          String responseData = dicomStore.retrieveDicomStudyMetadata(dicomWebPath);
+          context.output(METADATA, responseData);

Review comment:
       okay can you add a TODO there?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r518393717



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -37,9 +36,27 @@
 /**
  * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
  * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadDicomStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadDicomStudyMetadata.Result}. You can fetch the successful calls using
+ * getReadResponse(), and any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadDicomStudyMetadata.Result
+ * readMetadataResult = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
  */
 public class DicomIO {
 
+  public static ReadDicomStudyMetadata readDicomStudyMetadata() {

Review comment:
       ok one more question - does it make sense to just call it `readStudyMetadata` since we're already in DicomIO? Up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-816098437


   hmmm I think these should be run as IT tests indeed, and only whenever application default creds are available


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #13137:
URL: https://github.com/apache/beam/pull/13137


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r518770361



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -37,9 +36,27 @@
 /**
  * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
  * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadDicomStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadDicomStudyMetadata.Result}. You can fetch the successful calls using
+ * getReadResponse(), and any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadDicomStudyMetadata.Result
+ * readMetadataResult = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
  */
 public class DicomIO {
 
+  public static ReadDicomStudyMetadata readDicomStudyMetadata() {

Review comment:
       Makes sense to me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-736080729


   I think this broke Java Postcommit: https://ci-beam.apache.org/job/beam_PostCommit_Java/6904/
   
   ```
   java.nio.file.NoSuchFileException: src/test/resources/DICOM/testDicomFile.dcm
   	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
   	at java.nio.file.Files.newByteChannel(Files.java:361)
   	at java.nio.file.Files.newByteChannel(Files.java:407)
   	at java.nio.file.Files.readAllBytes(Files.java:3152)
   	at org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.uploadToDicomStore(HttpHealthcareApiClient.java:229)
   	at org.apache.beam.sdk.io.gcp.healthcare.DicomIOReadIT.setup(DicomIOReadIT.java:55)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-725437612


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726897771


   hm okay, feel free to re-add the suppression. Sorry about the trouble! And just the one comment about PAssert. LGTM after that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r523113422



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOReadIT.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings({"nullness", "rawtypes", "uninitialized"})
+public class DicomIOReadIT {
+  private static final String TEST_FILE_PATH = "src/test/resources/DICOM/testDicomFile.dcm";
+  private static final String TEST_FILE_STUDY_ID = "study_000000000";
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private String healthcareDataset;
+  private String project;
+  private HealthcareApiClient client;
+  private String storeName = "foo";
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    client = new HttpHealthcareApiClient();
+
+    client.createDicomStore(healthcareDataset, storeName);
+    client.uploadToDicomStore(healthcareDataset + "/dicomStores/" + storeName, TEST_FILE_PATH);
+  }
+
+  @After
+  public void deleteDicomStore() throws IOException {
+    client.deleteDicomStore(healthcareDataset + "/dicomStores/" + storeName);
+  }
+
+  @Test
+  public void testDicomMetadataRead() throws IOException {
+    String webPath =
+        String.format(
+            "%s/dicomStores/%s/dicomWeb/studies/%s",
+            healthcareDataset, storeName, TEST_FILE_STUDY_ID);
+
+    DicomIO.ReadStudyMetadata.Result result =
+        pipeline.apply(Create.of(webPath)).apply(DicomIO.readStudyMetadata());
+
+    PAssert.that(result.getFailedReads()).empty();

Review comment:
       Makes sense to me! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r523110870



##########
File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIOReadIT.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+@SuppressWarnings({"nullness", "rawtypes", "uninitialized"})
+public class DicomIOReadIT {
+  private static final String TEST_FILE_PATH = "src/test/resources/DICOM/testDicomFile.dcm";
+  private static final String TEST_FILE_STUDY_ID = "study_000000000";
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  private String healthcareDataset;
+  private String project;
+  private HealthcareApiClient client;
+  private String storeName = "foo";
+
+  @Before
+  public void setup() throws IOException, URISyntaxException {
+    project =
+        TestPipeline.testingPipelineOptions()
+            .as(HealthcareStoreTestPipelineOptions.class)
+            .getStoreProjectId();
+    healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
+    client = new HttpHealthcareApiClient();
+
+    client.createDicomStore(healthcareDataset, storeName);
+    client.uploadToDicomStore(healthcareDataset + "/dicomStores/" + storeName, TEST_FILE_PATH);
+  }
+
+  @After
+  public void deleteDicomStore() throws IOException {
+    client.deleteDicomStore(healthcareDataset + "/dicomStores/" + storeName);
+  }
+
+  @Test
+  public void testDicomMetadataRead() throws IOException {
+    String webPath =
+        String.format(
+            "%s/dicomStores/%s/dicomWeb/studies/%s",
+            healthcareDataset, storeName, TEST_FILE_STUDY_ID);
+
+    DicomIO.ReadStudyMetadata.Result result =
+        pipeline.apply(Create.of(webPath)).apply(DicomIO.readStudyMetadata());
+
+    PAssert.that(result.getFailedReads()).empty();

Review comment:
       Does it make sense to add a positive PAssert as well? Verifying that the content you inserted is still there?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-824491163


   > hmmm I think these should be run as IT tests indeed, and only whenever application default creds are available
   
   Who could make this change?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] poojavenkatram commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
poojavenkatram commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r512143891



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API: https://cloud.google.com/healthcare/docs/how-tos#dicom-guide
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a function that can
+   * be used to process a Pubsub message from a DICOM store, read the DICOM study path and get the metadata of the
+   * specified study.
+   * You can learn how to configure PubSub messages to be published when an instance is
+   * stored by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector
+   * will output a {@link ReadDicomStudyMetadata.Result} which will contain metadata of the study
+   * encoded as a json array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result form PCollectionTuple with OUT and DEAD_LETTER

Review comment:
       from instead of form and change the names here too..

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API: https://cloud.google.com/healthcare/docs/how-tos#dicom-guide
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a function that can
+   * be used to process a Pubsub message from a DICOM store, read the DICOM study path and get the metadata of the
+   * specified study.
+   * You can learn how to configure PubSub messages to be published when an instance is
+   * stored by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector
+   * will output a {@link ReadDicomStudyMetadata.Result} which will contain metadata of the study
+   * encoded as a json array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result form PCollectionTuple with OUT and DEAD_LETTER
+       * tags.
+       *
+       * @param pct the pct

Review comment:
       the PcollectionTuple which contains the response

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HttpHealthcareApiClient.java
##########
@@ -196,6 +197,69 @@ public Empty deleteFhirStore(String name) throws IOException {
     return client.projects().locations().datasets().fhirStores().delete(name).execute();
   }
 
+  @Override
+  public String retrieveDicomStudyMetadata(String dicomWebPath) throws IOException {
+    String[] webPathSplit;
+    webPathSplit = dicomWebPath.split("/dicomWeb/");
+    if (webPathSplit.length != 2) {
+      throw new IOException("Invalid Web Path");
+    }
+
+    String dicomStorePath = webPathSplit[0];
+
+    String[] searchParameters;
+    searchParameters = webPathSplit[1].split("/");
+    if (searchParameters.length < 2) {
+      throw new IOException("Invalid Web Path");

Review comment:
       invalid DICOM web path




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r522474275



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadStudyMetadata.Result}. You can fetch the successful calls using getReadResponse(), and
+ * any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadStudyMetadata.Result readMetadataResult
+ * = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
+ */
+public class DicomIO {
+
+  public static ReadStudyMetadata readStudyMetadata() {
+    return new ReadStudyMetadata();
+  }
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadStudyMetadata.Result} which will contain metadata of the study encoded as a json array.
+   */
+  public static class ReadStudyMetadata
+      extends PTransform<PCollection<String>, ReadStudyMetadata.Result> {
+
+    private ReadStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadStudyMetadata.Result from PCollectionTuple which contains the response
+       * (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static ReadStudyMetadata.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new ReadStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadStudyMetadata.METADATA "
+                  + "and DicomIO.ReadStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    @SuppressWarnings({"nullness", "rawtypes"})
+    static class FetchStudyMetadataFn extends DoFn<String, String> {
+
+      private HealthcareApiClient dicomStore;

Review comment:
       Hmm I'm still failing compileJava. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-734320929


   Hey, sorry this PR has been sitting here for a while. I have a couple of more commits (refactoring, clean up) for this PR. Should be on the tail-end of its life. I'll push these commits and notify the reviews in a few hours.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r509559499



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public class DicomIO {
+
+  /**
+   * Read dicom study metadata from a PCollection of resource webpath.
+   *
+   * @return the read
+   * @see ReadDicomStudyMetadata
+   */
+  public static ReadDicomStudyMetadata retrieveStudyMetadata() {

Review comment:
       We are not really doing much in this method, so we can remove it 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-736086841


   > I think this broke Java Postcommit: https://ci-beam.apache.org/job/beam_PostCommit_Java/6904/
   > 
   > ```
   > java.nio.file.NoSuchFileException: src/test/resources/DICOM/testDicomFile.dcm
   > 	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
   > 	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
   > 	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
   > 	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
   > 	at java.nio.file.Files.newByteChannel(Files.java:361)
   > 	at java.nio.file.Files.newByteChannel(Files.java:407)
   > 	at java.nio.file.Files.readAllBytes(Files.java:3152)
   > 	at org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.uploadToDicomStore(HttpHealthcareApiClient.java:229)
   > 	at org.apache.beam.sdk.io.gcp.healthcare.DicomIOReadIT.setup(DicomIOReadIT.java:55)
   > 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   > 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   > 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   > ```
   
   Working on a fix. In the meantime, I'll add the \@Ignore tag


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r522334301



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadStudyMetadata.Result}. You can fetch the successful calls using getReadResponse(), and
+ * any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadStudyMetadata.Result readMetadataResult
+ * = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
+ */
+public class DicomIO {
+
+  public static ReadStudyMetadata readStudyMetadata() {
+    return new ReadStudyMetadata();
+  }
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadStudyMetadata.Result} which will contain metadata of the study encoded as a json array.
+   */
+  public static class ReadStudyMetadata
+      extends PTransform<PCollection<String>, ReadStudyMetadata.Result> {
+
+    private ReadStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadStudyMetadata.Result from PCollectionTuple which contains the response
+       * (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static ReadStudyMetadata.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new ReadStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadStudyMetadata.METADATA "
+                  + "and DicomIO.ReadStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    @SuppressWarnings({"nullness", "rawtypes"})
+    static class FetchStudyMetadataFn extends DoFn<String, String> {
+
+      private HealthcareApiClient dicomStore;

Review comment:
       Maybe you also need to add null checks in process... up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-723267534


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r510437760



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.

Review comment:
       Would "make calls" be better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-724042925


    retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] poojavenkatram commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
poojavenkatram commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r512068306



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API from Google Cloud
+ * Healthcare. https://cloud.google.com/healthcare/docs/concepts/dicom
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    /**
+     * This class makes a call to the retrieve metadata endpoint
+     * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata It is

Review comment:
       add a ). after the link....

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API from Google Cloud
+ * Healthcare. https://cloud.google.com/healthcare/docs/concepts/dicom
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */

Review comment:
       add the class documentation here...the general rule of thumb is you add the description and all details about the class with the class definition and then each method will have it's specific description..

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API from Google Cloud
+ * Healthcare. https://cloud.google.com/healthcare/docs/concepts/dicom

Review comment:
       "allows Beam pipelines to make calls to the DICOM API of the Google Cloud Healthcare API: https://cloud.google.com/healthcare/docs/how-tos#dicom-guide" ? the link here is better because it links to the How-to guide which is much easier for someone to follow..

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API from Google Cloud
+ * Healthcare. https://cloud.google.com/healthcare/docs/concepts/dicom
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    /**
+     * This class makes a call to the retrieve metadata endpoint
+     * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata It is
+     * expecting a PubSub message as input, where the message's body will contain the path to the

Review comment:
       It defines a function that can be used to process a Pubsub message from a DICOM store, read the DICOM study path and get the metadata of the specified study.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json
+ * string
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};

Review comment:
       That's ok these names make it difficult to read and understand what these values are without going through the code which is not ideal so we should rename them..




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r516951894



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {

Review comment:
       Is it possible to read study metadata coming from a source other than Pubsub? Perhaps a batch pipeline that reads a set of files?
   
   I am wondering if it makes sense to fix the input to a Pubsub Message. It sounds like you need a `String` containing a web path - wouldn't it make sense to receive a PCollection of String, so that users don't have to always pass a Pubsub message?
   
   Another option is to make this a `PTransform<PCollection<T>, DicomIO.ReadDicomStudyMetadata.Result>`, and have a configuration parameter `SimpleFunction<T, String> dicomPathFunction` that users can pass to obtain the dicom path from the input object, but that feels like overkill : )

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}

Review comment:
       Transforms in Beam almost never are meant to be created via their constructor. I would recommend using a static class-level function (e.g. `DicomIO.readStudyMetadata()`), and making the constructor private.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result from PCollectionTuple which contains the
+       * response (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static DicomIO.ReadDicomStudyMetadata.Result of(PCollectionTuple pct)
+          throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new DicomIO.ReadDicomStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadDicomStudyMetadata.METADATA "
+                  + "and DicomIO.ReadDicomStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    static class FetchStudyMetadataFn extends DoFn<PubsubMessage, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.dicomStore = new HttpHealthcareApiClient();
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PubsubMessage msg = context.element();
+        byte[] msgPayload = msg.getPayload();
+        try {
+          String dicomWebPath = new String(msgPayload, "UTF-8");
+          String responseData = dicomStore.retrieveDicomStudyMetadata(dicomWebPath);
+          context.output(METADATA, responseData);

Review comment:
       I see that you're making individual blocking requests to the dicomStore - I have a couple questions:
   - Does dicom support batched requests?
   - Does it support concurrent requests?
   
   In this case, you're issuing individual, blocking calls, which may give your transform a very low throughput. If you can use batching and/or async IO, you should be able to reach a higher throughput.

##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */

Review comment:
       I recommend adding richer documentation here. Consider the FhirIO Javadoc:
   
   https://beam.apache.org/releases/javadoc/2.23.0/index.html?org/apache/beam/sdk/io/gcp/healthcare/FhirIO.html
   
   And the code:
   
   https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java#L91-L200




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726858344


   @pabloem Hi, can you take another look please? I'm still a little uncertain about removing the nullness warning suppression. I'm thinking adding some checks should be able to catch any error. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-733941409


   What is the next step for this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-724238769


   The failures are happening because of this:
   
   > \> Task :sdks:java:io:google-cloud-platform:compileJava
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:149: error: [initialization.fields.uninitialized] the constructor does not initialize fields: dicomStore
       static class FetchStudyMetadataFn extends DoFn<String, String> {
              ^
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:176: error: [argument.type.incompatible] incompatible argument for parameter output of output.
             context.output(ERROR_MESSAGE, e.getMessage());
                                                       ^
     found   : @ Initialized @ Nullable String
     required: @ Initialized @ NonNull String
   
   Neither seems to be a problem locally, and both these parts have similar code in the FhirIO connector. Not sure why it's happening?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-716755757


   R: @lukecwik
   Hi, can you take a look at this PR? Thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-722669497


   can you add an integration test? Feel free to ping me to discuss why/why not to add one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r522333663



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ *
+ * <h3>Reading Study-Level Metadata</h3>
+ *
+ * The study-level metadata for a dicom instance can be read with {@link ReadStudyMetadata}.
+ * Retrieve the metadata of a dicom instance given its store path as a string. This will return a
+ * {@link ReadStudyMetadata.Result}. You can fetch the successful calls using getReadResponse(), and
+ * any failed reads using getFailedReads().
+ *
+ * <h3>Example</h3>
+ *
+ * {@code Pipeline p = ... String webPath = ... DicomIO.ReadStudyMetadata.Result readMetadataResult
+ * = p .apply(Create.of(webPath)) PCollection<String> goodRead =
+ * readMetadataResult.getReadResponse() PCollection<String> failRead =
+ * readMetadataResult.getFailedReads() }
+ */
+public class DicomIO {
+
+  public static ReadStudyMetadata readStudyMetadata() {
+    return new ReadStudyMetadata();
+  }
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadStudyMetadata.Result} which will contain metadata of the study encoded as a json array.
+   */
+  public static class ReadStudyMetadata
+      extends PTransform<PCollection<String>, ReadStudyMetadata.Result> {
+
+    private ReadStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadStudyMetadata.Result from PCollectionTuple which contains the response
+       * (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static ReadStudyMetadata.Result of(PCollectionTuple pct) throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new ReadStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadStudyMetadata.METADATA "
+                  + "and DicomIO.ReadStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    @SuppressWarnings({"nullness", "rawtypes"})
+    static class FetchStudyMetadataFn extends DoFn<String, String> {
+
+      private HealthcareApiClient dicomStore;

Review comment:
       I suspect that if you do this, you can remove the `nullness` condition in SuppressWarnings?
   
   ```suggestion
         private HealthcareApiClient dicomStore = new HttpHealthcareApiClient();
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r510433340



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to read from the Dicom API from Google Cloud
+ * Healthcare.
+ *
+ * <p>Study-level metadata can be read using {@link ReadDicomStudyMetadata}. It is expecting a
+ * PubSub message as input, where the message's body will contain the location of the Study. You can
+ * learn how to configure PubSub messages to be published when an instance is stored in a data store
+ * by following: https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output
+ * a {@link ReadDicomStudyMetadata.Result} which will contain metadata of a study encoded in json
+ * string
+ */
+public class DicomIO {
+
+  /** The type ReadDicomStudyMetadata. */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    public static final TupleTag<String> OUT = new TupleTag<String>() {};

Review comment:
       These were the names given in the fhir and hl7 IO connectors. I've used the same for consistency. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r518770219



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result from PCollectionTuple which contains the
+       * response (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static DicomIO.ReadDicomStudyMetadata.Result of(PCollectionTuple pct)
+          throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new DicomIO.ReadDicomStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadDicomStudyMetadata.METADATA "
+                  + "and DicomIO.ReadDicomStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    static class FetchStudyMetadataFn extends DoFn<PubsubMessage, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.dicomStore = new HttpHealthcareApiClient();
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PubsubMessage msg = context.element();
+        byte[] msgPayload = msg.getPayload();
+        try {
+          String dicomWebPath = new String(msgPayload, "UTF-8");
+          String responseData = dicomStore.retrieveDicomStudyMetadata(dicomWebPath);
+          context.output(METADATA, responseData);

Review comment:
       Sure thing




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on a change in pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on a change in pull request #13137:
URL: https://github.com/apache/beam/pull/13137#discussion_r517592588



##########
File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.healthcare;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+
+/**
+ * The DicomIO connectors allows Beam pipelines to make calls to the Dicom API of the Google Cloud
+ * Healthcare API (https://cloud.google.com/healthcare/docs/how-tos#dicom-guide).
+ */
+public class DicomIO {
+
+  /**
+   * This class makes a call to the retrieve metadata endpoint
+   * (https://cloud.google.com/healthcare/docs/how-tos/dicomweb#retrieving_metadata). It defines a
+   * function that can be used to process a Pubsub message from a DICOM store, read the DICOM study
+   * path and get the metadata of the specified study. You can learn how to configure PubSub
+   * messages to be published when an instance is stored by following:
+   * https://cloud.google.com/healthcare/docs/how-tos/pubsub. The connector will output a {@link
+   * ReadDicomStudyMetadata.Result} which will contain metadata of the study encoded as a json
+   * array.
+   */
+  public static class ReadDicomStudyMetadata
+      extends PTransform<PCollection<PubsubMessage>, DicomIO.ReadDicomStudyMetadata.Result> {
+
+    public ReadDicomStudyMetadata() {}
+
+    /** TupleTag for the main output. */
+    public static final TupleTag<String> METADATA = new TupleTag<String>() {};
+    /** TupleTag for any error response. */
+    public static final TupleTag<String> ERROR_MESSAGE = new TupleTag<String>() {};
+
+    public static class Result implements POutput, PInput {
+      private PCollection<String> readResponse;
+
+      private PCollection<String> failedReads;
+
+      /** Contains both the response and error outputs from the transformation. */
+      PCollectionTuple pct;
+
+      /**
+       * Create DicomIO.ReadDicomStudyMetadata.Result from PCollectionTuple which contains the
+       * response (with METADATA and ERROR_MESSAGE tags).
+       *
+       * @param pct the pct
+       * @return the read result
+       * @throws IllegalArgumentException the illegal argument exception
+       */
+      static DicomIO.ReadDicomStudyMetadata.Result of(PCollectionTuple pct)
+          throws IllegalArgumentException {
+        if (pct.getAll()
+            .keySet()
+            .containsAll((Collection<?>) TupleTagList.of(METADATA).and(ERROR_MESSAGE))) {
+          return new DicomIO.ReadDicomStudyMetadata.Result(pct);
+        } else {
+          throw new IllegalArgumentException(
+              "The PCollection tuple must have the DicomIO.ReadDicomStudyMetadata.METADATA "
+                  + "and DicomIO.ReadDicomStudyMetadata.ERROR_MESSAGE tuple tags");
+        }
+      }
+
+      private Result(PCollectionTuple pct) {
+        this.pct = pct;
+        this.readResponse = pct.get(METADATA);
+        this.failedReads = pct.get(ERROR_MESSAGE);
+      }
+
+      /**
+       * Gets failed reads.
+       *
+       * @return the failed reads
+       */
+      public PCollection<String> getFailedReads() {
+        return failedReads;
+      }
+
+      /**
+       * Gets resources.
+       *
+       * @return the resources
+       */
+      public PCollection<String> getReadResponse() {
+        return readResponse;
+      }
+
+      @Override
+      public Pipeline getPipeline() {
+        return this.pct.getPipeline();
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> expand() {
+        return ImmutableMap.of(METADATA, readResponse);
+      }
+
+      @Override
+      public void finishSpecifyingOutput(
+          String transformName, PInput input, PTransform<?, ?> transform) {}
+    }
+
+    /**
+     * DoFn to fetch the metadata of a study from a Dicom store based on it's location and study id.
+     */
+    static class FetchStudyMetadataFn extends DoFn<PubsubMessage, String> {
+
+      private HealthcareApiClient dicomStore;
+
+      /**
+       * Instantiate the healthcare client.
+       *
+       * @throws IOException
+       */
+      @Setup
+      public void instantiateHealthcareClient() throws IOException {
+        this.dicomStore = new HttpHealthcareApiClient();
+      }
+
+      /**
+       * Process The Pub/Sub message.
+       *
+       * @param context The input containing the pub/sub message
+       */
+      @ProcessElement
+      public void processElement(ProcessContext context) {
+        PubsubMessage msg = context.element();
+        byte[] msgPayload = msg.getPayload();
+        try {
+          String dicomWebPath = new String(msgPayload, "UTF-8");
+          String responseData = dicomStore.retrieveDicomStudyMetadata(dicomWebPath);
+          context.output(METADATA, responseData);

Review comment:
       The Dicom API does support async calls. As you suggested, it is probably better to make use of this. 
   However, the read metadata endpoint is generally very fast, as it is transferring little data. So in the interest of time, would it be okay to leave it for now, and return to this in a later feature request?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726149041


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] suztomo edited a comment on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
suztomo edited a comment on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-813596393


   @Aliraza-N The DiacomIOTest and other gcp healthcare service tests require default application credentials. 
   
   > There are additional tests that suffer from this. They appear to be GCP integration tests, but have Test in the class name and so run under the :test task:
   > DicomIOTest. test_Dicom_failedMetadataRead
   > FhirIOTest. test_FhirIO_failedReads
   > FhirIOTest. test_FhirIO_failedWrites
   > HL7v2IOTest. test_HL7v2IO_failedReads
   > HL7v2IOTest. test_HL7v2IO_failedWrites
   
   https://issues.apache.org/jira/browse/BEAM-11363?focusedCommentId=17242773&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17242773
   
   Exception:
   
   ```
   Caused by: java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
   	at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:134)
   	at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:119)
   	at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:91)
   	at org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.initClient(HttpHealthcareApiClient.java:691)
   	at org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.<init>(HttpHealthcareApiClient.java:121)
   	at org.apache.beam.sdk.io.gcp.healthcare.DicomIO$ReadStudyMetadata$FetchStudyMetadataFn.instantiateHealthcareClient(DicomIO.java:164)
   ```
   
   Do you think they should be classified as integration tests?
   
   If not, is there a way to run this test in GitHub Actions?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-726108772


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-735917148


   thanks! I've merged this @Aliraza-N 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-734456526


   @pabloem Hey I've made some changes after some feedback on my pipeline project. Can you take a look when you have a chance? thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] Aliraza-N commented on pull request #13137: [BEAM-11073] Dicom IO Connector for Java

Posted by GitBox <gi...@apache.org>.
Aliraza-N commented on pull request #13137:
URL: https://github.com/apache/beam/pull/13137#issuecomment-724237629


   The tests are failing because of this in the build process:
   
   > Task :sdks:java:io:google-cloud-platform:compileJava
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:149: error: [initialization.fields.uninitialized] the constructor does not initialize fields: dicomStore
       static class FetchStudyMetadataFn extends DoFn<String, String> {
              ^
   /home/runner/work/beam/beam/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/DicomIO.java:176: error: [argument.type.incompatible] incompatible argument for parameter output of output.
             context.output(ERROR_MESSAGE, e.getMessage());
                                                       ^
     found   : @Initialized @Nullable String
     required: @Initialized @NonNull String
   
   Neither seems to be a problem locally. The code in this context looks very similar to the FhirIO code, so not sure why it's happening. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org