You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/07/16 16:18:49 UTC
[beam] branch master updated: [BEAM-2810] fastavro integration test
(#5862)
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 01d8b92 [BEAM-2810] fastavro integration test (#5862)
01d8b92 is described below
commit 01d8b922fbe355bc6818deebf83732ca20f79bce
Author: Ryan Williams <ry...@gmail.com>
AuthorDate: Mon Jul 16 12:18:44 2018 -0400
[BEAM-2810] fastavro integration test (#5862)
* fastavro integration test
---
.../apache_beam/examples/fastavro_it_test.py | 193 +++++++++++++++++++++
1 file changed, 193 insertions(+)
diff --git a/sdks/python/apache_beam/examples/fastavro_it_test.py b/sdks/python/apache_beam/examples/fastavro_it_test.py
new file mode 100644
index 0000000..1504b5b
--- /dev/null
+++ b/sdks/python/apache_beam/examples/fastavro_it_test.py
@@ -0,0 +1,193 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""End-to-end test for Avro IO's fastavro support.
+
+Writes a configurable number of records to a temporary location with each of
+{avro,fastavro}, then reads them back in, joins the two read datasets, and
+verifies they have the same elements.
+
+Usage:
+
+ DataFlowRunner:
+ python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+ --test-pipeline-options="
+ --runner=TestDataflowRunner
+ --project=...
+ --staging_location=gs://...
+ --temp_location=gs://...
+ --output=gs://...
+ --sdk_location=...
+ "
+
+ DirectRunner:
+ python setup.py nosetests --tests apache_beam.examples.fastavro_it_test \
+ --test-pipeline-options="
+ --output=/tmp
+ --records=5000
+ "
+"""
+
+from __future__ import absolute_import
+from __future__ import division
+
+import logging
+import unittest
+import uuid
+
+import avro
+from nose.plugins.attrib import attr
+
+from apache_beam.io.avroio import ReadAllFromAvro
+from apache_beam.io.avroio import WriteToAvro
+from apache_beam.runners.runner import PipelineState
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_utils import delete_files
+from apache_beam.testing.util import BeamAssertException
+from apache_beam.transforms.core import Create
+from apache_beam.transforms.core import FlatMap
+from apache_beam.transforms.core import Map
+from apache_beam.transforms.util import CoGroupByKey
+
+LABELS = ['abc', 'def', 'ghi', 'jkl', 'mno', 'pqr', 'stu', 'vwx']
+COLORS = ['RED', 'ORANGE', 'YELLOW', 'GREEN', 'BLUE', 'PURPLE', None]
+
+
+def record(i):
+ return {
+ 'label': LABELS[i % len(LABELS)],
+ 'number': i,
+ 'number_str': str(i),
+ 'color': COLORS[i % len(COLORS)]
+ }
+
+
+class FastavroIT(unittest.TestCase):
+
+ SCHEMA = avro.schema.parse('''
+ {"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "label", "type": "string"},
+ {"name": "number", "type": ["int", "null"]},
+ {"name": "number_str", "type": ["string", "null"]},
+ {"name": "color", "type": ["string", "null"]}
+ ]
+ }
+ ''')
+
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.uuid = str(uuid.uuid4())
+ self.output = '/'.join([
+ self.test_pipeline.get_option('output'),
+ self.uuid
+ ])
+
+ @attr('IT')
+ def test_avro_it(self):
+ num_records = self.test_pipeline.get_option('records')
+ num_records = int(num_records) if num_records else 1000000
+
+ # Seed a `PCollection` with indices that will each be FlatMap'd into
+ # `batch_size` records, to avoid having a too-large list in memory at
+ # the outset
+ batch_size = self.test_pipeline.get_option('batch-size')
+ batch_size = int(batch_size) if batch_size else 10000
+
+ # pylint: disable=range-builtin-not-iterating
+ batches = range(int(num_records / batch_size))
+
+ def batch_indices(start):
+ # pylint: disable=range-builtin-not-iterating
+ return range(start * batch_size, (start + 1) * batch_size)
+
+ # A `PCollection` with `num_records` avro records
+ records_pcoll = \
+ self.test_pipeline \
+ | 'create-batches' >> Create(batches) \
+ | 'expand-batches' >> FlatMap(batch_indices) \
+ | 'create-records' >> Map(record)
+
+ fastavro_output = '/'.join([self.output, 'fastavro'])
+ avro_output = '/'.join([self.output, 'avro'])
+
+ self.addCleanup(delete_files, [self.output + '*'])
+
+ # pylint: disable=expression-not-assigned
+ records_pcoll \
+ | 'write_fastavro' >> WriteToAvro(
+ fastavro_output,
+ self.SCHEMA,
+ use_fastavro=True
+ )
+
+ # pylint: disable=expression-not-assigned
+ records_pcoll \
+ | 'write_avro' >> WriteToAvro(
+ avro_output,
+ self.SCHEMA,
+ use_fastavro=False
+ )
+
+ result = self.test_pipeline.run()
+ result.wait_until_finish()
+ assert result.state == PipelineState.DONE
+
+ fastavro_read_pipeline = TestPipeline(is_integration_test=True)
+
+ fastavro_records = \
+ fastavro_read_pipeline \
+ | 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
+ | 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
+ | Map(lambda rec: (rec['number'], rec))
+
+ avro_records = \
+ fastavro_read_pipeline \
+ | 'create-avro' >> Create(['%s*' % avro_output]) \
+ | 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
+ | Map(lambda rec: (rec['number'], rec))
+
+ def check(elem):
+ v = elem[1]
+
+ def assertEqual(l, r):
+ if l != r:
+ raise BeamAssertException('Assertion failed: %s == %s' % (l, r))
+
+ assertEqual(v.keys(), ['avro', 'fastavro'])
+ avro_values = v['avro']
+ fastavro_values = v['fastavro']
+ assertEqual(avro_values, fastavro_values)
+ assertEqual(len(avro_values), 1)
+
+ # pylint: disable=expression-not-assigned
+ {
+ 'avro': avro_records,
+ 'fastavro': fastavro_records
+ } \
+ | CoGroupByKey() \
+ | Map(check)
+
+ fastavro_read_pipeline.run().wait_until_finish()
+ assert result.state == PipelineState.DONE
+
+
+if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.DEBUG)
+ unittest.main()