You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/05 19:22:48 UTC

[GitHub] [beam] George-Wu opened a new pull request #12473: [BEAM-10642] DICOM API Beam IO connector e2e test

George-Wu opened a new pull request #12473:
URL: https://github.com/apache/beam/pull/12473


   Add integration test for DICOM Beam IO connector
   
   ------------------------
   R:@pabloem,
   CC:@dranderson1117,@DanKotowski
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670677407


   LGTM. Let me run the lint locally to see what's going on.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109325



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] DanKotowski commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
DanKotowski commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r465976291



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())

Review comment:
       If tests are run in parallel potentially the 2 stores could have the same name because you are using time. It may be best to generate a random string.

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):

Review comment:
       the name of the test should describe the behaviour you are testing, could you update all your tests names.

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):

Review comment:
       I think it would also make sense to add  a test where there is an additional which does a bit more of a complex search .

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = self.temp_dicom_store
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      gcs_path = DICOM_FILES_PATH + "/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict, 'fileio')
+          | beam.Map(lambda x: x['success']))
+      assert_that(results, equal_to([True] * NUM_INSTANCE))

Review comment:
       We should define the expected value above and not inline.

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = self.temp_dicom_store
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      gcs_path = DICOM_FILES_PATH + "/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict, 'fileio')

Review comment:
       could we separate the input_dict between,  the store operation and the QIDO operation.

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
       Is it possible to add a pub/sub test, potentially we could run an UploadToDicomStore, and then check that we  are receiving the correct messages. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466736883



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'
+TEMP_FILES_PATH = 'gs://' + TEMP_BUCKET_NAME
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+  letters_and_digits = string.ascii_letters + string.digits
+  result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+  return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+def get_gcs_file_http(file_name):
+  # Get gcs file from REST Api
+  api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+      GCS_BASE_URL, TEMP_BUCKET_NAME, file_name)
+
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+
+  response = session.get(api_endpoint)
+  response.raise_for_status()
+  return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.expected_output_all_metadata = get_gcs_file_http(META_DATA_ALL_NAME)
+    self.expected_output_refined_metadata = get_gcs_file_http(
+        META_DATA_REFINED_NAME)
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search_all_instances(self):

Review comment:
       pipeline reduced




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem merged pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem merged pull request #12473:
URL: https://github.com/apache/beam/pull/12473


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670674085


   Run PythonLint PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109042



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
       I don't think a test of pub/sub is necessary here because our connector has no direct dependency or invocation of pub/sub. The 'pubsub to search metadata' converter was tested in the unit test while read and write operations can be tested in an easier way here rather than using pub/sub. What's more, testing a streaming pipeline is complicated and needs much more work :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466736695



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.

Review comment:
       Added documentations




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109042



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
       I think maybe a test of pub/sub is unnecessary here because our connector has no direct dependency or invocation of pub/sub. The 'pubsub to search metadata' converter was tested in the unit test while read and write operations can be tested in an easier way here rather than using pub/sub. What's more, testing a streaming pipeline is complicated and needs much more work :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670651439


   Remove raise_for_status so http error can be recorded


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670680899


   Run Python 3.8 PostCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466736948



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'
+TEMP_FILES_PATH = 'gs://' + TEMP_BUCKET_NAME
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+  letters_and_digits = string.ascii_letters + string.digits
+  result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+  return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+def get_gcs_file_http(file_name):
+  # Get gcs file from REST Api
+  api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+      GCS_BASE_URL, TEMP_BUCKET_NAME, file_name)
+
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+
+  response = session.get(api_endpoint)
+  response.raise_for_status()
+  return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.expected_output_all_metadata = get_gcs_file_http(META_DATA_ALL_NAME)
+    self.expected_output_refined_metadata = get_gcs_file_http(
+        META_DATA_REFINED_NAME)
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search_all_instances(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_all_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with self.test_pipeline as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]), label='all search assert')
+
+  @attr('IT')
+  def test_dicom_search_refined_instances(self):
+    # Refine search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+    input_dict['params'] = {
+        'StudyInstanceUID': 'study_000000001', 'limit': 500, 'offset': 0
+    }
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_refined_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with self.test_pipeline as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(
+          results, equal_to([expected_dict]), label='refine search assert')
+
+  @attr('IT')
+  def test_dicom_store_instance_from_gcs(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict_store = {}
+    input_dict_store['project_id'] = self.project
+    input_dict_store['region'] = REGION
+    input_dict_store['dataset_id'] = DATA_SET_ID
+    input_dict_store['dicom_store_id'] = self.temp_dicom_store
+
+    expected_output = [True] * NUM_INSTANCE
+
+    with self.test_pipeline as p:
+      gcs_path = TEMP_FILES_PATH + "/dicom_files/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict_store, 'fileio')
+          | beam.Map(lambda x: x['success']))
+      assert_that(
+          results, equal_to(expected_output), label='store first assert')
+
+    input_dict_search = {}
+    input_dict_search['project_id'] = self.project
+    input_dict_search['region'] = REGION
+    input_dict_search['dataset_id'] = DATA_SET_ID
+    input_dict_search['dicom_store_id'] = self.temp_dicom_store
+    input_dict_search['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_all_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict_search
+    expected_dict['success'] = True
+

Review comment:
       Pipeline reduced




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670680825


   okay py37lint passing in my machine. I'll jsut run postcommit, and we should be good to go.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104661



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):

Review comment:
       test case added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] pabloem commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
pabloem commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466559724



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'

Review comment:
       please make sure that this bucket has lifecycle policies to ensure any left over files will be erased automatically (or feel free to use any of the other `temp-storage-*` in Beam which do have lifecycle policies

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.

Review comment:
       Can you please add detailed documentation on how the test works? for example, are there any instances of Dicom stores expected to be running? What files are used for the test?  (I see it all gets created, so that's great. just please document all of that here).

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'
+TEMP_FILES_PATH = 'gs://' + TEMP_BUCKET_NAME
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+  letters_and_digits = string.ascii_letters + string.digits
+  result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+  return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+def get_gcs_file_http(file_name):
+  # Get gcs file from REST Api
+  api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+      GCS_BASE_URL, TEMP_BUCKET_NAME, file_name)
+
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+
+  response = session.get(api_endpoint)
+  response.raise_for_status()
+  return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.expected_output_all_metadata = get_gcs_file_http(META_DATA_ALL_NAME)
+    self.expected_output_refined_metadata = get_gcs_file_http(
+        META_DATA_REFINED_NAME)
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search_all_instances(self):

Review comment:
       as you realized, each test takes 5 minutes to run. With that in mind, would it be possible to consolidate the all_instance and refined_instance searches into a single test case?
   
   Something roughly like this:
   ```
   with p:
     refined = p | searchrefined
     all = p | searchall
     assert_that(exp_ref, refined)
     assert_that(exp_all, all, "allmatch")
   ```

##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'
+TEMP_FILES_PATH = 'gs://' + TEMP_BUCKET_NAME
+META_DATA_ALL_NAME = 'Dicom_io_it_test_data.json'
+META_DATA_REFINED_NAME = 'Dicom_io_it_test_refined_data.json'
+NUM_INSTANCE = 18
+RAND_LEN = 15
+
+
+def random_string_generator(length):
+  letters_and_digits = string.ascii_letters + string.digits
+  result = ''.join((random.choice(letters_and_digits) for i in range(length)))
+  return result
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+def get_gcs_file_http(file_name):
+  # Get gcs file from REST Api
+  api_endpoint = "{}/b/{}/o/{}?alt=media".format(
+      GCS_BASE_URL, TEMP_BUCKET_NAME, file_name)
+
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+
+  response = session.get(api_endpoint)
+  response.raise_for_status()
+  return response.json()
+
+
+@unittest.skipIf(DicomSearch is None, 'GCP dependencies are not installed')
+class DICOMIoIntegrationTest(unittest.TestCase):
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+    self.expected_output_all_metadata = get_gcs_file_http(META_DATA_ALL_NAME)
+    self.expected_output_refined_metadata = get_gcs_file_http(
+        META_DATA_REFINED_NAME)
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + random_string_generator(RAND_LEN)
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search_all_instances(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_all_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with self.test_pipeline as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]), label='all search assert')
+
+  @attr('IT')
+  def test_dicom_search_refined_instances(self):
+    # Refine search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+    input_dict['params'] = {
+        'StudyInstanceUID': 'study_000000001', 'limit': 500, 'offset': 0
+    }
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_refined_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with self.test_pipeline as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(
+          results, equal_to([expected_dict]), label='refine search assert')
+
+  @attr('IT')
+  def test_dicom_store_instance_from_gcs(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict_store = {}
+    input_dict_store['project_id'] = self.project
+    input_dict_store['region'] = REGION
+    input_dict_store['dataset_id'] = DATA_SET_ID
+    input_dict_store['dicom_store_id'] = self.temp_dicom_store
+
+    expected_output = [True] * NUM_INSTANCE
+
+    with self.test_pipeline as p:
+      gcs_path = TEMP_FILES_PATH + "/dicom_files/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict_store, 'fileio')
+          | beam.Map(lambda x: x['success']))
+      assert_that(
+          results, equal_to(expected_output), label='store first assert')
+
+    input_dict_search = {}
+    input_dict_search['project_id'] = self.project
+    input_dict_search['region'] = REGION
+    input_dict_search['dataset_id'] = DATA_SET_ID
+    input_dict_search['dicom_store_id'] = self.temp_dicom_store
+    input_dict_search['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_all_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict_search
+    expected_dict['success'] = True
+

Review comment:
       in this test case you're running two pipelines. This will therefore take 10 minutes to run. Perhaps it's worth making this the single test case that tests upload and search (or perhaps remove the search pipeline from this case to avoid the super long runtime?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-670660458


   Run PythonLint PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #12473:
URL: https://github.com/apache/beam/pull/12473#issuecomment-912844670


   Would authors of this test by chance be available to look at the test failure in https://issues.apache.org/jira/browse/BEAM-12826 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] DanKotowski commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
DanKotowski commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466463132



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
       Alright, we can leave that then for now. I think the test that we have is a good start and give us basic coverage for the connector.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466109042



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):

Review comment:
       I think maybe a test of pub/sub is necessary here because our connector has no direct dependency or invocation of pub/sub. The 'pubsub to search metadata' converter was tested in the unit test while read and write operations can be tested in an easier way here rather than using pub/sub. What's more, testing a streaming pipeline is complicated and needs much more work :(




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104978



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = self.temp_dicom_store
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      gcs_path = DICOM_FILES_PATH + "/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict, 'fileio')

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466736799



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,215 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import random
+import string
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from apache_beam.io.gcp.dicomio import DicomSearch
+  from apache_beam.io.gcp.dicomio import UploadToDicomStore
+  from google.auth import default
+  from google.auth.transport import requests
+except ImportError:
+  DicomSearch = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+GCS_BASE_URL = 'https://storage.googleapis.com/storage/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+TEMP_BUCKET_NAME = 'temp-storage-for-dicom-io-tests'

Review comment:
       Changed to right storage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] George-Wu commented on a change in pull request #12473: [BEAM-10601] DICOM API Beam IO connector e2e test

Posted by GitBox <gi...@apache.org>.
George-Wu commented on a change in pull request #12473:
URL: https://github.com/apache/beam/pull/12473#discussion_r466104754



##########
File path: sdks/python/apache_beam/io/gcp/dicomio_integration_test.py
##########
@@ -0,0 +1,155 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Integration test for Google Cloud DICOM IO connector.
+"""
+# pytype: skip-file
+
+from __future__ import absolute_import
+
+import json
+import time
+import unittest
+
+from google.auth import default
+from google.auth.transport import requests
+from google.cloud import storage
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.gcp.dicomio import DicomSearch
+from apache_beam.io.gcp.dicomio import UploadToDicomStore
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+REGION = 'us-central1'
+DATA_SET_ID = 'apache-beam-integration-testing'
+HEALTHCARE_BASE_URL = 'https://healthcare.googleapis.com/v1'
+PERSISTENT_DICOM_STORE_NAME = "dicom_it_persistent_store"
+DICOM_FILES_PATH = "gs://temp-storage-for-dicom-io-tests/dicom_files"
+NUM_INSTANCE = 18
+
+
+def create_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Create a an empty DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores".format(api_endpoint, dataset_id)
+
+  response = session.post(
+      dicomweb_path, params={"dicomStoreId": dicom_store_id})
+  response.raise_for_status()
+  return response.status_code
+
+
+def delete_dicom_store(project_id, dataset_id, region, dicom_store_id):
+  # Delete an existing DICOM store
+  credential, _ = default()
+  session = requests.AuthorizedSession(credential)
+  api_endpoint = "{}/projects/{}/locations/{}".format(
+      HEALTHCARE_BASE_URL, project_id, region)
+
+  # base of dicomweb path.
+  dicomweb_path = "{}/datasets/{}/dicomStores/{}".format(
+      api_endpoint, dataset_id, dicom_store_id)
+
+  response = session.delete(dicomweb_path)
+  response.raise_for_status()
+  return response.status_code
+
+
+class DICOMIoIntegrationTest(unittest.TestCase):
+  expected_output_metadata = None
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+    self.project = self.test_pipeline.get_option('project')
+
+    # create a temp Dicom store based on the time stamp
+    self.temp_dicom_store = "DICOM_store_" + str(time.time())
+    create_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+    client = storage.Client()
+    bucket = client.get_bucket('temp-storage-for-dicom-io-tests')
+    blob = bucket.blob('meta_data_json/Dicom_io_it_test_data.json')
+    data = json.loads(blob.download_as_string())
+    self.expected_output_metadata = data
+
+  def tearDown(self):
+    # clean up the temp Dicom store
+    delete_dicom_store(self.project, DATA_SET_ID, REGION, self.temp_dicom_store)
+
+  @attr('IT')
+  def test_dicom_search(self):
+    # Search and compare the metadata of a persistent DICOM store.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = PERSISTENT_DICOM_STORE_NAME
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      results = (p | beam.Create([input_dict]) | DicomSearch())
+      assert_that(results, equal_to([expected_dict]))
+
+  @attr('IT')
+  def test_dicom_store_instance(self):
+    # Store DICOM files to a empty DICOM store from a GCS bucket,
+    # then check if the store metadata match.
+    input_dict = {}
+    input_dict['project_id'] = self.project
+    input_dict['region'] = REGION
+    input_dict['dataset_id'] = DATA_SET_ID
+    input_dict['dicom_store_id'] = self.temp_dicom_store
+    input_dict['search_type'] = "instances"
+
+    expected_dict = {}
+    expected_dict['result'] = self.expected_output_metadata
+    expected_dict['status'] = 200
+    expected_dict['input'] = input_dict
+    expected_dict['success'] = True
+
+    with TestPipeline() as p:
+      gcs_path = DICOM_FILES_PATH + "/*"
+      results = (
+          p
+          | fileio.MatchFiles(gcs_path)
+          | fileio.ReadMatches()
+          | UploadToDicomStore(input_dict, 'fileio')
+          | beam.Map(lambda x: x['success']))
+      assert_that(results, equal_to([True] * NUM_INSTANCE))

Review comment:
       Changed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org