You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:07:30 UTC
[2/3] incubator-beam git commit: Making Dataflow Python Materialized
PCollection representation more efficient (3 of several).
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a483c18/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 1e2c50e..919e9d2 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
"""Tests for Google Cloud Storage client."""
import logging
@@ -49,10 +48,11 @@ class FakeFile(object):
self.generation = generation
def get_metadata(self):
- return storage.Object(bucket=self.bucket,
- name=self.object,
- generation=self.generation,
- size=len(self.contents))
+ return storage.Object(
+ bucket=self.bucket,
+ name=self.object,
+ generation=self.generation,
+ size=len(self.contents))
class FakeGcsObjects(object):
@@ -81,7 +81,7 @@ class FakeGcsObjects(object):
f = self.get_file(get_request.bucket, get_request.object)
if f is None:
# Failing with a HTTP 404 if file does not exist.
- raise HttpError({'status':404}, None, None)
+ raise HttpError({'status': 404}, None, None)
if download is None:
return f.get_metadata()
else:
@@ -90,6 +90,7 @@ class FakeGcsObjects(object):
def get_range_callback(start, end):
assert start >= 0 and end >= start and end < len(f.contents)
stream.write(f.contents[start:end + 1])
+
download.GetRange = get_range_callback
def Insert(self, insert_request, upload=None): # pylint: disable=invalid-name
@@ -114,13 +115,14 @@ class FakeGcsObjects(object):
src_file = self.get_file(copy_request.sourceBucket,
copy_request.sourceObject)
if not src_file:
- raise HttpError(httplib2.Response({'status': '404'}), '404 Not Found',
- 'https://fake/url')
+ raise HttpError(
+ httplib2.Response({'status': '404'}), '404 Not Found',
+ 'https://fake/url')
generation = self.get_last_generation(copy_request.destinationBucket,
copy_request.destinationObject) + 1
dest_file = FakeFile(copy_request.destinationBucket,
- copy_request.destinationObject,
- src_file.contents, generation)
+ copy_request.destinationObject, src_file.contents,
+ generation)
self.add_file(dest_file)
def Delete(self, delete_request): # pylint: disable=invalid-name
@@ -129,8 +131,9 @@ class FakeGcsObjects(object):
if self.get_file(delete_request.bucket, delete_request.object):
self.delete_file(delete_request.bucket, delete_request.object)
else:
- raise HttpError(httplib2.Response({'status': '404'}), '404 Not Found',
- 'https://fake/url')
+ raise HttpError(
+ httplib2.Response({'status': '404'}), '404 Not Found',
+ 'https://fake/url')
def List(self, list_request): # pylint: disable=invalid-name
bucket = list_request.bucket
@@ -202,7 +205,7 @@ class TestGCSIO(unittest.TestCase):
def test_exists_failure(self, mock_get):
# Raising an error other than 404. Raising 404 is a valid failure for
# exists() call.
- mock_get.side_effect = HttpError({'status':400}, None, None)
+ mock_get.side_effect = HttpError({'status': 400}, None, None)
file_name = 'gs://gcsio-test/dummy_file'
file_size = 1234
self._insert_random_file(self.client, file_name, file_size)
@@ -240,34 +243,32 @@ class TestGCSIO(unittest.TestCase):
self.gcs.delete(file_name)
self._insert_random_file(self.client, file_name, file_size)
- self.assertTrue(gcsio.parse_gcs_path(file_name) in
- self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(file_name) in self.client.objects.files)
self.gcs.delete(file_name)
- self.assertFalse(gcsio.parse_gcs_path(file_name) in
- self.client.objects.files)
+ self.assertFalse(
+ gcsio.parse_gcs_path(file_name) in self.client.objects.files)
def test_copy(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
file_size = 1024
- self._insert_random_file(self.client, src_file_name,
- file_size)
- self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self._insert_random_file(self.client, src_file_name, file_size)
+ self.assertTrue(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertFalse(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.copy(src_file_name, dest_file_name)
- self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
- self.assertRaises(IOError, self.gcs.copy,
- 'gs://gcsio-test/non-existent',
+ self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
'gs://gcsio-test/non-existent-destination')
def test_copytree(self):
@@ -278,46 +279,45 @@ class TestGCSIO(unittest.TestCase):
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
- self._insert_random_file(self.client, src_file_name,
- file_size)
- self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self._insert_random_file(self.client, src_file_name, file_size)
+ self.assertTrue(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertFalse(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.copytree(src_dir_name, dest_dir_name)
for path in paths:
src_file_name = src_dir_name + path
dest_file_name = dest_dir_name + path
- self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
def test_rename(self):
src_file_name = 'gs://gcsio-test/source'
dest_file_name = 'gs://gcsio-test/dest'
file_size = 1024
- self._insert_random_file(self.client, src_file_name,
- file_size)
- self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self._insert_random_file(self.client, src_file_name, file_size)
+ self.assertTrue(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertFalse(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
self.gcs.rename(src_file_name, dest_file_name)
- self.assertFalse(gcsio.parse_gcs_path(src_file_name) in
- self.client.objects.files)
- self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
- self.client.objects.files)
+ self.assertFalse(
+ gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+ self.assertTrue(
+ gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
def test_full_file_read(self):
file_name = 'gs://gcsio-test/full_file'
file_size = 5 * 1024 * 1024 + 100
random_file = self._insert_random_file(self.client, file_name, file_size)
f = self.gcs.open(file_name)
+ self.assertEqual(f.mode, 'r')
f.seek(0, os.SEEK_END)
self.assertEqual(f.tell(), file_size)
self.assertEqual(f.read(), '')
@@ -337,8 +337,8 @@ class TestGCSIO(unittest.TestCase):
start, end = min(a, b), max(a, b)
f.seek(start)
self.assertEqual(f.tell(), start)
- self.assertEqual(f.read(end - start + 1),
- random_file.contents[start:end + 1])
+ self.assertEqual(
+ f.read(end - start + 1), random_file.contents[start:end + 1])
self.assertEqual(f.tell(), end + 1)
def test_file_read_line(self):
@@ -398,6 +398,7 @@ class TestGCSIO(unittest.TestCase):
file_size = 5 * 1024 * 1024 + 2000
contents = os.urandom(file_size)
f = self.gcs.open(file_name, 'w')
+ self.assertEqual(f.mode, 'w')
f.write(contents[0:1000])
f.write(contents[1000:1024 * 1024])
f.write(contents[1024 * 1024:])
@@ -406,6 +407,36 @@ class TestGCSIO(unittest.TestCase):
self.assertEqual(
self.client.objects.get_file(bucket, name).contents, contents)
+ def test_file_close(self):
+ file_name = 'gs://gcsio-test/close_file'
+ file_size = 5 * 1024 * 1024 + 2000
+ contents = os.urandom(file_size)
+ f = self.gcs.open(file_name, 'w')
+ self.assertEqual(f.mode, 'w')
+ f.write(contents)
+ f.close()
+ f.close() # This should not crash.
+ bucket, name = gcsio.parse_gcs_path(file_name)
+ self.assertEqual(
+ self.client.objects.get_file(bucket, name).contents, contents)
+
+ def test_file_flush(self):
+ file_name = 'gs://gcsio-test/flush_file'
+ file_size = 5 * 1024 * 1024 + 2000
+ contents = os.urandom(file_size)
+ bucket, name = gcsio.parse_gcs_path(file_name)
+ f = self.gcs.open(file_name, 'w')
+ self.assertEqual(f.mode, 'w')
+ f.write(contents[0:1000])
+ f.flush()
+ f.write(contents[1000:1024 * 1024])
+ f.flush()
+ f.flush() # Should be a NOOP.
+ f.write(contents[1024 * 1024:])
+ f.close() # This should already call the equivalent of flush() in its body.
+ self.assertEqual(
+ self.client.objects.get_file(bucket, name).contents, contents)
+
def test_context_manager(self):
# Test writing with a context manager.
file_name = 'gs://gcsio-test/context_manager_file'
@@ -496,10 +527,10 @@ class TestGCSIO(unittest.TestCase):
]),
]
for file_pattern, expected_object_names in test_cases:
- expected_file_names = ['gs://%s/%s' % (bucket_name, o) for o in
- expected_object_names]
- self.assertEqual(set(self.gcs.glob(file_pattern)),
- set(expected_file_names))
+ expected_file_names = ['gs://%s/%s' % (bucket_name, o)
+ for o in expected_object_names]
+ self.assertEqual(
+ set(self.gcs.glob(file_pattern)), set(expected_file_names))
class TestPipeStream(unittest.TestCase):
@@ -525,7 +556,7 @@ class TestPipeStream(unittest.TestCase):
self.assertEqual(''.join(data_list), expected)
def test_pipe_stream(self):
- block_sizes = list(4 ** i for i in range(0, 12))
+ block_sizes = list(4**i for i in range(0, 12))
data_blocks = list(os.urandom(size) for size in block_sizes)
expected = ''.join(data_blocks)
@@ -534,8 +565,8 @@ class TestPipeStream(unittest.TestCase):
for buffer_size in buffer_sizes:
parent_conn, child_conn = multiprocessing.Pipe()
stream = gcsio.GcsBufferedWriter.PipeStream(child_conn)
- child_thread = threading.Thread(target=self._read_and_verify,
- args=(stream, expected, buffer_size))
+ child_thread = threading.Thread(
+ target=self._read_and_verify, args=(stream, expected, buffer_size))
child_thread.start()
for data in data_blocks:
parent_conn.send_bytes(data)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a483c18/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
index 4e84147..3ab8383 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
@@ -87,9 +87,12 @@ class TestTextFileSource(
pass
-class NativeTestTextFileSink(
- TestWithInProcessPipelineRunner, fileio_test.NativeTestTextFileSink):
- pass
+class TestNativeTextFileSink(
+ TestWithInProcessPipelineRunner, fileio_test.TestNativeTextFileSink):
+
+ def setUp(self):
+ TestWithInProcessPipelineRunner.setUp(self)
+ fileio_test.TestNativeTextFileSink.setUp(self)
class TestTextFileSink(