You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/04/02 17:52:46 UTC

[arrow] branch master updated: ARROW-4945: [Flight] Enable integration tests in Travis

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ef662f  ARROW-4945: [Flight] Enable integration tests in Travis
1ef662f is described below

commit 1ef662f8f1517c3a3295fd844c848dbb582956dd
Author: David Li <Da...@twosigma.com>
AuthorDate: Tue Apr 2 12:52:39 2019 -0500

    ARROW-4945: [Flight] Enable integration tests in Travis
    
    Author: David Li <Da...@twosigma.com>
    Author: Kenta Murata <mr...@mrkn.jp>
    
    Closes #4003 from lihalite/full-integration-tests and squashes the following commits:
    
    3e1d8dac9 <David Li> Enable Flight integration tests in Travis
    20555c89e <David Li> Add SKIP support to integration tests
    a49d1779b <Kenta Murata> ARROW-5050:  Specify dependencies of grpc_ep
---
 .travis.yml                     |   2 +
 ci/travis_script_integration.sh |   2 +-
 integration/integration_test.py | 120 +++++++++++++++++++++++++++-------------
 3 files changed, 84 insertions(+), 40 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e6ebd5a..90590b5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -228,6 +228,8 @@ matrix:
     env: ARROW_TEST_GROUP=integration
     jdk: openjdk8
     env:
+    - ARROW_TRAVIS_USE_TOOLCHAIN=1
+    - ARROW_TRAVIS_FLIGHT=1
     - ARROW_TRAVIS_PLASMA=1
     - ARROW_TRAVIS_PLASMA_JAVA_CLIENT=1
     - ARROW_TRAVIS_USE_TOOLCHAIN=1
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index e8d67ac..c51005d 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -51,7 +51,7 @@ conda install -y -q python=3.6 six numpy
 INTEGRATION_TEMPDIR=$TRAVIS_BUILD_DIR/integration_temp
 mkdir -p $INTEGRATION_TEMPDIR
 
-python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR
+python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR --run_flight
 
 popd
 
diff --git a/integration/integration_test.py b/integration/integration_test.py
index d6cb965..b184cd2 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -747,13 +747,23 @@ class JsonRecordBatch(object):
         ])
 
 
+# SKIP categories
+SKIP_ARROW = 'arrow'
+SKIP_FLIGHT = 'flight'
+
+
 class JsonFile(object):
 
-    def __init__(self, name, schema, batches, dictionaries=None):
+    def __init__(self, name, schema, batches, dictionaries=None,
+                 skip=None, path=None):
         self.name = name
         self.schema = schema
         self.dictionaries = dictionaries or []
         self.batches = batches
+        self.skip = set()
+        self.path = path
+        if skip:
+            self.skip.update(skip)
 
     def get_json(self):
         entries = [
@@ -772,6 +782,15 @@ class JsonFile(object):
     def write(self, path):
         with open(path, 'wb') as f:
             f.write(json.dumps(self.get_json(), indent=2).encode('utf-8'))
+        self.path = path
+
+    def skip_category(self, category):
+        """Skip this test for the given category.
+
+        Category should be SKIP_ARROW or SKIP_FLIGHT.
+        """
+        self.skip.add(category)
+        return self
 
 
 def get_field(name, type_, nullable=True):
@@ -799,7 +818,7 @@ def get_field(name, type_, nullable=True):
         raise TypeError(dtype)
 
 
-def _generate_file(name, fields, batch_sizes, dictionaries=None):
+def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None):
     schema = JsonSchema(fields)
     batches = []
     for size in batch_sizes:
@@ -810,7 +829,7 @@ def _generate_file(name, fields, batch_sizes, dictionaries=None):
 
         batches.append(JsonRecordBatch(size, columns))
 
-    return JsonFile(name, schema, batches, dictionaries)
+    return JsonFile(name, schema, batches, dictionaries, skip=skip)
 
 
 def generate_primitive_case(batch_sizes, name='primitive'):
@@ -908,11 +927,11 @@ def get_generated_json_files(tempdir=None, flight=False):
         generate_decimal_case(),
         generate_datetime_case(),
         generate_nested_case(),
-        generate_dictionary_case()
+        generate_dictionary_case().skip_category(SKIP_FLIGHT),
     ]
 
     if flight:
-        file_objs.append(generate_primitive_case([32 * 1024],
+        file_objs.append(generate_primitive_case([24 * 1024],
                                                  name='large_batch'))
 
     generated_paths = []
@@ -920,7 +939,7 @@ def get_generated_json_files(tempdir=None, flight=False):
         out_path = os.path.join(tempdir, 'generated_' +
                                 file_obj.name + '.json')
         file_obj.write(out_path)
-        generated_paths.append(out_path)
+        generated_paths.append(file_obj)
 
     return generated_paths
 
@@ -942,11 +961,8 @@ class IntegrationRunner(object):
         for producer, consumer in itertools.product(
                 filter(lambda t: t.PRODUCER, self.testers),
                 filter(lambda t: t.CONSUMER, self.testers)):
-            try:
-                self._compare_implementations(producer, consumer)
-            except Exception:
-                traceback.print_exc()
-                failures.append((producer, consumer, sys.exc_info()))
+            for failure in self._compare_implementations(producer, consumer):
+                failures.append(failure)
         return failures
 
     def run_flight(self):
@@ -967,7 +983,8 @@ class IntegrationRunner(object):
         )
         print('##########################################################')
 
-        for json_path in self.json_files:
+        for test_case in self.json_files:
+            json_path = test_case.path
             print('==========================================================')
             print('Testing file {0}'.format(json_path))
             print('==========================================================')
@@ -977,27 +994,37 @@ class IntegrationRunner(object):
             file_id = guid()[:8]
 
             # Make the random access file
-            print('-- Creating binary inputs')
             producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
                                               name + '.json_as_file')
-            producer.json_to_file(json_path, producer_file_path)
-
-            # Validate the file
-            print('-- Validating file')
-            consumer.validate(json_path, producer_file_path)
-
-            print('-- Validating stream')
             producer_stream_path = os.path.join(self.temp_dir, file_id + '_' +
                                                 name +
                                                 '.producer_file_as_stream')
             consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
                                               name +
                                               '.consumer_stream_as_file')
-            producer.file_to_stream(producer_file_path,
-                                    producer_stream_path)
-            consumer.stream_to_file(producer_stream_path,
-                                    consumer_file_path)
-            consumer.validate(json_path, consumer_file_path)
+
+            if SKIP_ARROW in test_case.skip:
+                print('-- Skipping test')
+                continue
+
+            try:
+                print('-- Creating binary inputs')
+                producer.json_to_file(json_path, producer_file_path)
+
+                # Validate the file
+                print('-- Validating file')
+                consumer.validate(json_path, producer_file_path)
+
+                print('-- Validating stream')
+                producer.file_to_stream(producer_file_path,
+                                        producer_stream_path)
+                consumer.stream_to_file(producer_stream_path,
+                                        consumer_file_path)
+                consumer.validate(json_path, consumer_file_path)
+            except Exception:
+                traceback.print_exc()
+                yield (test_case, producer, consumer, sys.exc_info())
+                continue
 
     def _compare_flight_implementations(self, producer, consumer):
         print('##########################################################')
@@ -1006,19 +1033,25 @@ class IntegrationRunner(object):
         )
         print('##########################################################')
 
-        for json_path in self.json_files:
+        for test_case in self.json_files:
+            json_path = test_case.path
             print('=' * 58)
             print('Testing file {0}'.format(json_path))
             print('=' * 58)
 
-            with producer.flight_server():
-                # Have the client upload the file, then download and
-                # compare
-                try:
+            if SKIP_FLIGHT in test_case.skip:
+                print('-- Skipping test')
+                continue
+
+            try:
+                with producer.flight_server():
+                    # Have the client upload the file, then download and
+                    # compare
                     consumer.flight_request(producer.FLIGHT_PORT, json_path)
-                except Exception:
-                    traceback.print_exc()
-                    yield (producer, consumer, sys.exc_info())
+            except Exception:
+                traceback.print_exc()
+                yield (test_case, producer, consumer, sys.exc_info())
+                continue
 
 
 class Tester(object):
@@ -1140,7 +1173,7 @@ class JavaTester(Tester):
                     output)
             yield
         finally:
-            server.terminate()
+            server.kill()
             server.wait(5)
 
 
@@ -1219,7 +1252,7 @@ class CPPTester(Tester):
                     output)
             yield
         finally:
-            server.terminate()
+            server.kill()
             server.wait(5)
 
     def flight_request(self, port, json_path):
@@ -1295,7 +1328,9 @@ class JSTester(Tester):
 
 def get_static_json_files():
     glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
-    return glob.glob(glob_pattern)
+    return [JsonFile(name=os.path.basename(p), path=p, skip=set(),
+                     schema=None, batches=None)
+            for p in glob.glob(glob_pattern)]
 
 
 def run_all_tests(run_flight=False, debug=False, tempdir=None):
@@ -1314,13 +1349,20 @@ def run_all_tests(run_flight=False, debug=False, tempdir=None):
     if run_flight:
         failures.extend(runner.run_flight())
 
+    fail_count = 0
     if failures:
         print("################# FAILURES #################")
-        for producer, consumer, exc_info in failures:
-            print("FAILED TEST:", producer.name, "producing, ",
+        for test_case, producer, consumer, exc_info in failures:
+            fail_count += 1
+            print("FAILED TEST:", end=" ")
+            print(test_case.name, producer.name, "producing, ",
                   consumer.name, "consuming")
-            traceback.print_exception(*exc_info)
+            if exc_info:
+                traceback.print_exception(*exc_info)
             print()
+
+    print(fail_count, "failures")
+    if fail_count > 0:
         sys.exit(1)