You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/05 20:29:00 UTC

[jira] [Commented] (BEAM-2774) Add I/O source for VCF files (python)

    [ https://issues.apache.org/jira/browse/BEAM-2774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279144#comment-16279144 ] 

ASF GitHub Bot commented on BEAM-2774:
--------------------------------------

chamikaramj closed pull request #4157: [BEAM-2774] Added loose failure mode to allow individual VCF record reads to fail
URL: https://github.com/apache/beam/pull/4157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py
index b877a32d01b..80f4631e462 100644
--- a/sdks/python/apache_beam/io/vcfio.py
+++ b/sdks/python/apache_beam/io/vcfio.py
@@ -22,6 +22,8 @@
 
 from __future__ import absolute_import
 
+import logging
+import traceback
 from collections import namedtuple
 
 import vcf
@@ -33,8 +35,8 @@
 from apache_beam.io.textio import _TextSource as TextSource
 from apache_beam.transforms import PTransform
 
-__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo']
-
+__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo',
+           'MalformedVcfRecord']
 
 # Stores data about variant INFO fields. The type of 'data' is specified in the
 # VCF headers. 'field_count' is a string that specifies the number of fields
@@ -45,6 +47,10 @@
 #   - 'G': one value for each possible genotype.
 #   - 'R': one value for each possible allele (including the reference).
 VariantInfo = namedtuple('VariantInfo', ['data', 'field_count'])
+# Stores data about failed VCF record reads. `line` is the text line that
+# caused the failed read and `file_name` is the name of the file that the read
+# failed in.
+MalformedVcfRecord = namedtuple('MalformedVcfRecord', ['file_name', 'line'])
 MISSING_FIELD_VALUE = '.'  # Indicates field is missing in VCF record.
 PASS_FILTER = 'PASS'  # Indicates that all filters have been passed.
 END_INFO_KEY = 'END'  # The info key that explicitly specifies end of a record.
@@ -223,7 +229,8 @@ def __init__(self,
                file_pattern,
                compression_type=CompressionTypes.AUTO,
                buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE,
-               validate=True):
+               validate=True,
+               allow_malformed_records=False):
     super(_VcfSource, self).__init__(file_pattern,
                                      compression_type=compression_type,
                                      validate=validate)
@@ -231,6 +238,7 @@ def __init__(self,
     self._header_lines_per_file = {}
     self._compression_type = compression_type
     self._buffer_size = buffer_size
+    self._allow_malformed_records = allow_malformed_records
 
   def read_records(self, file_name, range_tracker):
     record_iterator = _VcfSource._VcfRecordIterator(
@@ -238,6 +246,7 @@ def read_records(self, file_name, range_tracker):
         range_tracker,
         self._pattern,
         self._compression_type,
+        self._allow_malformed_records,
         buffer_size=self._buffer_size,
         skip_header_lines=0)
 
@@ -253,10 +262,12 @@ def __init__(self,
                  range_tracker,
                  file_pattern,
                  compression_type,
+                 allow_malformed_records,
                  **kwargs):
       self._header_lines = []
       self._last_record = None
       self._file_name = file_name
+      self._allow_malformed_records = allow_malformed_records
 
       text_source = TextSource(
           file_pattern,
@@ -274,7 +285,9 @@ def __init__(self,
       try:
         self._vcf_reader = vcf.Reader(fsock=self._create_generator())
       except SyntaxError as e:
-        raise ValueError('Invalid VCF header %s' % str(e))
+        raise ValueError('An exception was raised when reading header from VCF '
+                         'file %s: %s' % (self._file_name,
+                                          traceback.format_exc(e)))
 
     def _store_header_lines(self, header_lines):
       self._header_lines = header_lines
@@ -301,7 +314,18 @@ def next(self):
         return self._convert_to_variant_record(record, self._vcf_reader.infos,
                                                self._vcf_reader.formats)
       except (LookupError, ValueError) as e:
-        raise ValueError('Invalid record in VCF file. Error: %s' % str(e))
+        if self._allow_malformed_records:
+          logging.warning(
+              'An exception was raised when reading record from VCF file '
+              '%s. Invalid record was %s: %s',
+              self._file_name, self._last_record, traceback.format_exc(e))
+          return MalformedVcfRecord(self._file_name, self._last_record)
+
+        raise ValueError('An exception was raised when reading record from VCF '
+                         'file %s. Invalid record was %s: %s' % (
+                             self._file_name,
+                             self._last_record,
+                             traceback.format_exc(e)))
 
     def _convert_to_variant_record(self, record, infos, formats):
       """Converts the PyVCF record to a :class:`Variant` object.
@@ -407,7 +431,7 @@ class ReadFromVcf(PTransform):
   Parses VCF files (version 4) using PyVCF library. If file_pattern specifies
   multiple files, then the header from each file is used separately to parse
   the content. However, the output will be a PCollection of
-  :class:`Variant` objects.
+  :class:`Variant` (or :class:`MalformedVcfRecord` for failed reads) objects.
   """
 
   def __init__(
@@ -415,6 +439,7 @@ def __init__(
       file_pattern=None,
       compression_type=CompressionTypes.AUTO,
       validate=True,
+      allow_malformed_records=False,
       **kwargs):
     """Initialize the :class:`ReadFromVcf` transform.
 
@@ -427,10 +452,17 @@ def __init__(
         underlying file_path's extension will be used to detect the compression.
       validate (bool): flag to verify that the files exist during the pipeline
         creation time.
+      allow_malformed_records (bool): determines if failed VCF
+        record reads will be tolerated. Failed record reads will result in a
+        :class:`MalformedVcfRecord` being returned from the read of the record
+        rather than a :class:`Variant`.
     """
     super(ReadFromVcf, self).__init__(**kwargs)
     self._source = _VcfSource(
-        file_pattern, compression_type, validate=validate)
+        file_pattern,
+        compression_type,
+        validate=validate,
+        allow_malformed_records=allow_malformed_records)
 
   def expand(self, pvalue):
     return pvalue.pipeline | Read(self._source)
diff --git a/sdks/python/apache_beam/io/vcfio_test.py b/sdks/python/apache_beam/io/vcfio_test.py
index 871b6e9c8c0..7ff16d49b06 100644
--- a/sdks/python/apache_beam/io/vcfio_test.py
+++ b/sdks/python/apache_beam/io/vcfio_test.py
@@ -20,12 +20,14 @@
 import logging
 import os
 import unittest
+from itertools import chain
 from itertools import permutations
 
 import apache_beam.io.source_test_utils as source_test_utils
 from apache_beam.io.vcfio import _VcfSource as VcfSource
 from apache_beam.io.vcfio import DEFAULT_PHASESET_VALUE
 from apache_beam.io.vcfio import MISSING_GENOTYPE_VALUE
+from apache_beam.io.vcfio import MalformedVcfRecord
 from apache_beam.io.vcfio import ReadFromVcf
 from apache_beam.io.vcfio import Variant
 from apache_beam.io.vcfio import VariantCall
@@ -95,8 +97,9 @@ class VcfSourceTest(unittest.TestCase):
   def _create_temp_vcf_file(self, lines, tempdir):
     return tempdir.create_temp_file(suffix='.vcf', lines=lines)
 
-  def _read_records(self, file_or_pattern):
-    return source_test_utils.read_from_source(VcfSource(file_or_pattern))
+  def _read_records(self, file_or_pattern, **kwargs):
+    return source_test_utils.read_from_source(
+        VcfSource(file_or_pattern, **kwargs))
 
   def _create_temp_file_and_read_records(self, lines):
     with TempDir() as tempdir:
@@ -177,6 +180,56 @@ def _get_sample_variant_3(self):
                     info={'GQ': None}))
     return variant, vcf_line
 
+  def _get_invalid_file_contents(self):
+    """Gets sample invalid files contents.
+
+    Returns:
+       A `tuple` where the first element is contents that are invalid because
+       of record errors and the second element is contents that are invalid
+       because of header errors.
+    """
+    malformed_vcf_records = [
+        # Malfromed record.
+        [
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '1    1  '
+        ],
+        # Missing "GT:GQ" format, but GQ is provided.
+        [
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT	1|0:48'
+        ],
+        # GT is not an integer.
+        [
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT	A|0'
+        ],
+        # POS should be an integer.
+        [
+            '##FILTER=<ID=PASS,Description="All filters passed">\n',
+            '##FILTER=<ID=q10,Description="Quality is less than 10.">\n',
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '19	abc	rs12345	T	C	9	q10	AF=0.2;NS=2	GT:GQ	1|0:48\n',
+        ]
+    ]
+    malformed_header_lines = [
+        # Malformed FILTER.
+        [
+            '##FILTER=<ID=PASS,Description="All filters passed">\n',
+            '##FILTER=<ID=LowQual,Descri\n',
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT:GQ	1|0:48',
+        ],
+        # Invalid Number value for INFO.
+        [
+            '##INFO=<ID=G,Number=U,Type=String,Description="InvalidNumber">\n',
+            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	Sample\n',
+            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT:GQ	1|0:48\n',
+        ]
+    ]
+
+    return (malformed_vcf_records, malformed_header_lines)
+
   def test_sort_variants(self):
     sorted_variants = [
         Variant(reference_name='a', start=20, end=22),
@@ -286,59 +339,33 @@ def test_read_after_splitting(self):
     self.assertEqual(9882, len(split_records))
 
   def test_invalid_file(self):
-    invalid_file_contents = [
-        # Malfromed record.
-        [
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '1    1  '
-        ],
-        # Missing "GT:GQ" format, but GQ is provided.
-        [
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT	1|0:48'
-        ],
-        # GT is not an integer.
-        [
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT	A|0'
-        ],
-        # Malformed FILTER.
-        [
-            '##FILTER=<ID=PASS,Description="All filters passed">\n',
-            '##FILTER=<ID=LowQual,Descri\n',
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT:GQ	1|0:48',
-        ],
-        # Invalid Number value for INFO.
-        [
-            '##INFO=<ID=G,Number=U,Type=String,Description="InvalidNumber">\n',
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '19	123	rs12345	T	C	50	q10	AF=0.2;NS=2	GT:GQ	1|0:48\n',
-        ],
-        # POS should be an integer.
-        [
-            '##FILTER=<ID=PASS,Description="All filters passed">\n',
-            '##FILTER=<ID=LowQual,Descri\n',
-            '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO	FORMAT	SampleName\n',
-            '19	abc	rs12345	T	C	50	q10	AF=0.2;NS=2	GT:GQ	1|0:48\n',
-        ],
-    ]
-    for content in invalid_file_contents:
-      try:
-        with TempDir() as tempdir:
-          self._read_records(self._create_temp_vcf_file(content, tempdir))
-          self.fail('Invalid VCF file must throw an exception')
-      except ValueError:
-        pass
+    invalid_file_contents = self._get_invalid_file_contents()
+    for content in chain(*invalid_file_contents):
+      with TempDir() as tempdir, self.assertRaises(ValueError):
+        self._read_records(self._create_temp_vcf_file(content, tempdir))
     # Try with multiple files (any one of them will throw an exception).
-    with TempDir() as tempdir:
-      for content in invalid_file_contents:
+    with TempDir() as tempdir, self.assertRaises(ValueError):
+      for content in chain(*invalid_file_contents):
         self._create_temp_vcf_file(content, tempdir)
-      try:
-        self._read_records(os.path.join(tempdir.get_path(), '*.vcf'))
-        self.fail('Invalid VCF file must throw an exception.')
-      except ValueError:
-        pass
+      self._read_records(os.path.join(tempdir.get_path(), '*.vcf'))
+
+  def test_allow_malformed_records(self):
+    invalid_records, invalid_headers = self._get_invalid_file_contents()
+
+    # Invalid records should not raise errors
+    for content in invalid_records:
+      with TempDir() as tempdir:
+        records = self._read_records(
+            self._create_temp_vcf_file(content, tempdir),
+            allow_malformed_records=True)
+        for record in records:
+          self.assertIsInstance(record, MalformedVcfRecord)
+
+    # Invalid headers should still raise errors
+    for content in invalid_headers:
+      with TempDir() as tempdir, self.assertRaises(ValueError):
+        self._read_records(self._create_temp_vcf_file(content, tempdir),
+                           allow_malformed_records=True)
 
   def test_no_samples(self):
     header_line = '#CHROM	POS	ID	REF	ALT	QUAL	FILTER	INFO\n'


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add I/O source for VCF files (python)
> -------------------------------------
>
>                 Key: BEAM-2774
>                 URL: https://issues.apache.org/jira/browse/BEAM-2774
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-py-core
>            Reporter: Asha Rostamianfar
>            Assignee: Miles Saul
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> A new I/O source for reading (and eventually writing) VCF files [1] for Python. The design doc is available at https://docs.google.com/document/d/1jsdxOPALYYlhnww2NLURS8NKXaFyRSJrcGbEDpY9Lkw/edit
> [1] http://samtools.github.io/hts-specs/VCFv4.3.pdf



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)