You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:07:30 UTC

[2/3] incubator-beam git commit: Making Dataflow Python Materialized PCollection representation more efficient (3 of several).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a483c18/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 1e2c50e..919e9d2 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 """Tests for Google Cloud Storage client."""
 
 import logging
@@ -49,10 +48,11 @@ class FakeFile(object):
     self.generation = generation
 
   def get_metadata(self):
-    return storage.Object(bucket=self.bucket,
-                          name=self.object,
-                          generation=self.generation,
-                          size=len(self.contents))
+    return storage.Object(
+        bucket=self.bucket,
+        name=self.object,
+        generation=self.generation,
+        size=len(self.contents))
 
 
 class FakeGcsObjects(object):
@@ -81,7 +81,7 @@ class FakeGcsObjects(object):
     f = self.get_file(get_request.bucket, get_request.object)
     if f is None:
       # Failing with a HTTP 404 if file does not exist.
-      raise HttpError({'status':404}, None, None)
+      raise HttpError({'status': 404}, None, None)
     if download is None:
       return f.get_metadata()
     else:
@@ -90,6 +90,7 @@ class FakeGcsObjects(object):
       def get_range_callback(start, end):
         assert start >= 0 and end >= start and end < len(f.contents)
         stream.write(f.contents[start:end + 1])
+
       download.GetRange = get_range_callback
 
   def Insert(self, insert_request, upload=None):  # pylint: disable=invalid-name
@@ -114,13 +115,14 @@ class FakeGcsObjects(object):
     src_file = self.get_file(copy_request.sourceBucket,
                              copy_request.sourceObject)
     if not src_file:
-      raise HttpError(httplib2.Response({'status': '404'}), '404 Not Found',
-                      'https://fake/url')
+      raise HttpError(
+          httplib2.Response({'status': '404'}), '404 Not Found',
+          'https://fake/url')
     generation = self.get_last_generation(copy_request.destinationBucket,
                                           copy_request.destinationObject) + 1
     dest_file = FakeFile(copy_request.destinationBucket,
-                         copy_request.destinationObject,
-                         src_file.contents, generation)
+                         copy_request.destinationObject, src_file.contents,
+                         generation)
     self.add_file(dest_file)
 
   def Delete(self, delete_request):  # pylint: disable=invalid-name
@@ -129,8 +131,9 @@ class FakeGcsObjects(object):
     if self.get_file(delete_request.bucket, delete_request.object):
       self.delete_file(delete_request.bucket, delete_request.object)
     else:
-      raise HttpError(httplib2.Response({'status': '404'}), '404 Not Found',
-                      'https://fake/url')
+      raise HttpError(
+          httplib2.Response({'status': '404'}), '404 Not Found',
+          'https://fake/url')
 
   def List(self, list_request):  # pylint: disable=invalid-name
     bucket = list_request.bucket
@@ -202,7 +205,7 @@ class TestGCSIO(unittest.TestCase):
   def test_exists_failure(self, mock_get):
     # Raising an error other than 404. Raising 404 is a valid failure for
     # exists() call.
-    mock_get.side_effect = HttpError({'status':400}, None, None)
+    mock_get.side_effect = HttpError({'status': 400}, None, None)
     file_name = 'gs://gcsio-test/dummy_file'
     file_size = 1234
     self._insert_random_file(self.client, file_name, file_size)
@@ -240,34 +243,32 @@ class TestGCSIO(unittest.TestCase):
     self.gcs.delete(file_name)
 
     self._insert_random_file(self.client, file_name, file_size)
-    self.assertTrue(gcsio.parse_gcs_path(file_name) in
-                    self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
     self.gcs.delete(file_name)
 
-    self.assertFalse(gcsio.parse_gcs_path(file_name) in
-                     self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(file_name) in self.client.objects.files)
 
   def test_copy(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
     file_size = 1024
-    self._insert_random_file(self.client, src_file_name,
-                             file_size)
-    self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
-                    self.client.objects.files)
-    self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
-                     self.client.objects.files)
+    self._insert_random_file(self.client, src_file_name, file_size)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.copy(src_file_name, dest_file_name)
 
-    self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
-                    self.client.objects.files)
-    self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
-                    self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
-    self.assertRaises(IOError, self.gcs.copy,
-                      'gs://gcsio-test/non-existent',
+    self.assertRaises(IOError, self.gcs.copy, 'gs://gcsio-test/non-existent',
                       'gs://gcsio-test/non-existent-destination')
 
   def test_copytree(self):
@@ -278,46 +279,45 @@ class TestGCSIO(unittest.TestCase):
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      self._insert_random_file(self.client, src_file_name,
-                               file_size)
-      self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
-                      self.client.objects.files)
-      self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
-                       self.client.objects.files)
+      self._insert_random_file(self.client, src_file_name, file_size)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertFalse(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.copytree(src_dir_name, dest_dir_name)
 
     for path in paths:
       src_file_name = src_dir_name + path
       dest_file_name = dest_dir_name + path
-      self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
-                      self.client.objects.files)
-      self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
-                      self.client.objects.files)
+      self.assertTrue(
+          gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+      self.assertTrue(
+          gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
   def test_rename(self):
     src_file_name = 'gs://gcsio-test/source'
     dest_file_name = 'gs://gcsio-test/dest'
     file_size = 1024
-    self._insert_random_file(self.client, src_file_name,
-                             file_size)
-    self.assertTrue(gcsio.parse_gcs_path(src_file_name) in
-                    self.client.objects.files)
-    self.assertFalse(gcsio.parse_gcs_path(dest_file_name) in
-                     self.client.objects.files)
+    self._insert_random_file(self.client, src_file_name, file_size)
+    self.assertTrue(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
     self.gcs.rename(src_file_name, dest_file_name)
 
-    self.assertFalse(gcsio.parse_gcs_path(src_file_name) in
-                     self.client.objects.files)
-    self.assertTrue(gcsio.parse_gcs_path(dest_file_name) in
-                    self.client.objects.files)
+    self.assertFalse(
+        gcsio.parse_gcs_path(src_file_name) in self.client.objects.files)
+    self.assertTrue(
+        gcsio.parse_gcs_path(dest_file_name) in self.client.objects.files)
 
   def test_full_file_read(self):
     file_name = 'gs://gcsio-test/full_file'
     file_size = 5 * 1024 * 1024 + 100
     random_file = self._insert_random_file(self.client, file_name, file_size)
     f = self.gcs.open(file_name)
+    self.assertEqual(f.mode, 'r')
     f.seek(0, os.SEEK_END)
     self.assertEqual(f.tell(), file_size)
     self.assertEqual(f.read(), '')
@@ -337,8 +337,8 @@ class TestGCSIO(unittest.TestCase):
       start, end = min(a, b), max(a, b)
       f.seek(start)
       self.assertEqual(f.tell(), start)
-      self.assertEqual(f.read(end - start + 1),
-                       random_file.contents[start:end + 1])
+      self.assertEqual(
+          f.read(end - start + 1), random_file.contents[start:end + 1])
       self.assertEqual(f.tell(), end + 1)
 
   def test_file_read_line(self):
@@ -398,6 +398,7 @@ class TestGCSIO(unittest.TestCase):
     file_size = 5 * 1024 * 1024 + 2000
     contents = os.urandom(file_size)
     f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
     f.write(contents[0:1000])
     f.write(contents[1000:1024 * 1024])
     f.write(contents[1024 * 1024:])
@@ -406,6 +407,36 @@ class TestGCSIO(unittest.TestCase):
     self.assertEqual(
         self.client.objects.get_file(bucket, name).contents, contents)
 
+  def test_file_close(self):
+    file_name = 'gs://gcsio-test/close_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents)
+    f.close()
+    f.close()  # This should not crash.
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
+  def test_file_flush(self):
+    file_name = 'gs://gcsio-test/flush_file'
+    file_size = 5 * 1024 * 1024 + 2000
+    contents = os.urandom(file_size)
+    bucket, name = gcsio.parse_gcs_path(file_name)
+    f = self.gcs.open(file_name, 'w')
+    self.assertEqual(f.mode, 'w')
+    f.write(contents[0:1000])
+    f.flush()
+    f.write(contents[1000:1024 * 1024])
+    f.flush()
+    f.flush()  # Should be a NOOP.
+    f.write(contents[1024 * 1024:])
+    f.close()  # This should already call the equivalent of flush() in its body.
+    self.assertEqual(
+        self.client.objects.get_file(bucket, name).contents, contents)
+
   def test_context_manager(self):
     # Test writing with a context manager.
     file_name = 'gs://gcsio-test/context_manager_file'
@@ -496,10 +527,10 @@ class TestGCSIO(unittest.TestCase):
         ]),
     ]
     for file_pattern, expected_object_names in test_cases:
-      expected_file_names = ['gs://%s/%s' % (bucket_name, o) for o in
-                             expected_object_names]
-      self.assertEqual(set(self.gcs.glob(file_pattern)),
-                       set(expected_file_names))
+      expected_file_names = ['gs://%s/%s' % (bucket_name, o)
+                             for o in expected_object_names]
+      self.assertEqual(
+          set(self.gcs.glob(file_pattern)), set(expected_file_names))
 
 
 class TestPipeStream(unittest.TestCase):
@@ -525,7 +556,7 @@ class TestPipeStream(unittest.TestCase):
     self.assertEqual(''.join(data_list), expected)
 
   def test_pipe_stream(self):
-    block_sizes = list(4 ** i for i in range(0, 12))
+    block_sizes = list(4**i for i in range(0, 12))
     data_blocks = list(os.urandom(size) for size in block_sizes)
     expected = ''.join(data_blocks)
 
@@ -534,8 +565,8 @@ class TestPipeStream(unittest.TestCase):
     for buffer_size in buffer_sizes:
       parent_conn, child_conn = multiprocessing.Pipe()
       stream = gcsio.GcsBufferedWriter.PipeStream(child_conn)
-      child_thread = threading.Thread(target=self._read_and_verify,
-                                      args=(stream, expected, buffer_size))
+      child_thread = threading.Thread(
+          target=self._read_and_verify, args=(stream, expected, buffer_size))
       child_thread.start()
       for data in data_blocks:
         parent_conn.send_bytes(data)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6a483c18/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
index 4e84147..3ab8383 100644
--- a/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
+++ b/sdks/python/apache_beam/runners/inprocess/inprocess_runner_test.py
@@ -87,9 +87,12 @@ class TestTextFileSource(
   pass
 
 
-class NativeTestTextFileSink(
-    TestWithInProcessPipelineRunner, fileio_test.NativeTestTextFileSink):
-  pass
+class TestNativeTextFileSink(
+    TestWithInProcessPipelineRunner, fileio_test.TestNativeTextFileSink):
+
+  def setUp(self):
+    TestWithInProcessPipelineRunner.setUp(self)
+    fileio_test.TestNativeTextFileSink.setUp(self)
 
 
 class TestTextFileSink(