You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/04/02 17:52:46 UTC
[arrow] branch master updated: ARROW-4945: [Flight] Enable
integration tests in Travis
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 1ef662f ARROW-4945: [Flight] Enable integration tests in Travis
1ef662f is described below
commit 1ef662f8f1517c3a3295fd844c848dbb582956dd
Author: David Li <Da...@twosigma.com>
AuthorDate: Tue Apr 2 12:52:39 2019 -0500
ARROW-4945: [Flight] Enable integration tests in Travis
Author: David Li <Da...@twosigma.com>
Author: Kenta Murata <mr...@mrkn.jp>
Closes #4003 from lihalite/full-integration-tests and squashes the following commits:
3e1d8dac9 <David Li> Enable Flight integration tests in Travis
20555c89e <David Li> Add SKIP support to integration tests
a49d1779b <Kenta Murata> ARROW-5050: Specify dependencies of grpc_ep
---
.travis.yml | 2 +
ci/travis_script_integration.sh | 2 +-
integration/integration_test.py | 120 +++++++++++++++++++++++++++-------------
3 files changed, 84 insertions(+), 40 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index e6ebd5a..90590b5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -228,6 +228,8 @@ matrix:
env: ARROW_TEST_GROUP=integration
jdk: openjdk8
env:
+ - ARROW_TRAVIS_USE_TOOLCHAIN=1
+ - ARROW_TRAVIS_FLIGHT=1
- ARROW_TRAVIS_PLASMA=1
- ARROW_TRAVIS_PLASMA_JAVA_CLIENT=1
- ARROW_TRAVIS_USE_TOOLCHAIN=1
diff --git a/ci/travis_script_integration.sh b/ci/travis_script_integration.sh
index e8d67ac..c51005d 100755
--- a/ci/travis_script_integration.sh
+++ b/ci/travis_script_integration.sh
@@ -51,7 +51,7 @@ conda install -y -q python=3.6 six numpy
INTEGRATION_TEMPDIR=$TRAVIS_BUILD_DIR/integration_temp
mkdir -p $INTEGRATION_TEMPDIR
-python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR
+python integration_test.py --debug --tempdir=$INTEGRATION_TEMPDIR --run_flight
popd
diff --git a/integration/integration_test.py b/integration/integration_test.py
index d6cb965..b184cd2 100644
--- a/integration/integration_test.py
+++ b/integration/integration_test.py
@@ -747,13 +747,23 @@ class JsonRecordBatch(object):
])
+# SKIP categories
+SKIP_ARROW = 'arrow'
+SKIP_FLIGHT = 'flight'
+
+
class JsonFile(object):
- def __init__(self, name, schema, batches, dictionaries=None):
+ def __init__(self, name, schema, batches, dictionaries=None,
+ skip=None, path=None):
self.name = name
self.schema = schema
self.dictionaries = dictionaries or []
self.batches = batches
+ self.skip = set()
+ self.path = path
+ if skip:
+ self.skip.update(skip)
def get_json(self):
entries = [
@@ -772,6 +782,15 @@ class JsonFile(object):
def write(self, path):
with open(path, 'wb') as f:
f.write(json.dumps(self.get_json(), indent=2).encode('utf-8'))
+ self.path = path
+
+ def skip_category(self, category):
+ """Skip this test for the given category.
+
+ Category should be SKIP_ARROW or SKIP_FLIGHT.
+ """
+ self.skip.add(category)
+ return self
def get_field(name, type_, nullable=True):
@@ -799,7 +818,7 @@ def get_field(name, type_, nullable=True):
raise TypeError(dtype)
-def _generate_file(name, fields, batch_sizes, dictionaries=None):
+def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None):
schema = JsonSchema(fields)
batches = []
for size in batch_sizes:
@@ -810,7 +829,7 @@ def _generate_file(name, fields, batch_sizes, dictionaries=None):
batches.append(JsonRecordBatch(size, columns))
- return JsonFile(name, schema, batches, dictionaries)
+ return JsonFile(name, schema, batches, dictionaries, skip=skip)
def generate_primitive_case(batch_sizes, name='primitive'):
@@ -908,11 +927,11 @@ def get_generated_json_files(tempdir=None, flight=False):
generate_decimal_case(),
generate_datetime_case(),
generate_nested_case(),
- generate_dictionary_case()
+ generate_dictionary_case().skip_category(SKIP_FLIGHT),
]
if flight:
- file_objs.append(generate_primitive_case([32 * 1024],
+ file_objs.append(generate_primitive_case([24 * 1024],
name='large_batch'))
generated_paths = []
@@ -920,7 +939,7 @@ def get_generated_json_files(tempdir=None, flight=False):
out_path = os.path.join(tempdir, 'generated_' +
file_obj.name + '.json')
file_obj.write(out_path)
- generated_paths.append(out_path)
+ generated_paths.append(file_obj)
return generated_paths
@@ -942,11 +961,8 @@ class IntegrationRunner(object):
for producer, consumer in itertools.product(
filter(lambda t: t.PRODUCER, self.testers),
filter(lambda t: t.CONSUMER, self.testers)):
- try:
- self._compare_implementations(producer, consumer)
- except Exception:
- traceback.print_exc()
- failures.append((producer, consumer, sys.exc_info()))
+ for failure in self._compare_implementations(producer, consumer):
+ failures.append(failure)
return failures
def run_flight(self):
@@ -967,7 +983,8 @@ class IntegrationRunner(object):
)
print('##########################################################')
- for json_path in self.json_files:
+ for test_case in self.json_files:
+ json_path = test_case.path
print('==========================================================')
print('Testing file {0}'.format(json_path))
print('==========================================================')
@@ -977,27 +994,37 @@ class IntegrationRunner(object):
file_id = guid()[:8]
# Make the random access file
- print('-- Creating binary inputs')
producer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name + '.json_as_file')
- producer.json_to_file(json_path, producer_file_path)
-
- # Validate the file
- print('-- Validating file')
- consumer.validate(json_path, producer_file_path)
-
- print('-- Validating stream')
producer_stream_path = os.path.join(self.temp_dir, file_id + '_' +
name +
'.producer_file_as_stream')
consumer_file_path = os.path.join(self.temp_dir, file_id + '_' +
name +
'.consumer_stream_as_file')
- producer.file_to_stream(producer_file_path,
- producer_stream_path)
- consumer.stream_to_file(producer_stream_path,
- consumer_file_path)
- consumer.validate(json_path, consumer_file_path)
+
+ if SKIP_ARROW in test_case.skip:
+ print('-- Skipping test')
+ continue
+
+ try:
+ print('-- Creating binary inputs')
+ producer.json_to_file(json_path, producer_file_path)
+
+ # Validate the file
+ print('-- Validating file')
+ consumer.validate(json_path, producer_file_path)
+
+ print('-- Validating stream')
+ producer.file_to_stream(producer_file_path,
+ producer_stream_path)
+ consumer.stream_to_file(producer_stream_path,
+ consumer_file_path)
+ consumer.validate(json_path, consumer_file_path)
+ except Exception:
+ traceback.print_exc()
+ yield (test_case, producer, consumer, sys.exc_info())
+ continue
def _compare_flight_implementations(self, producer, consumer):
print('##########################################################')
@@ -1006,19 +1033,25 @@ class IntegrationRunner(object):
)
print('##########################################################')
- for json_path in self.json_files:
+ for test_case in self.json_files:
+ json_path = test_case.path
print('=' * 58)
print('Testing file {0}'.format(json_path))
print('=' * 58)
- with producer.flight_server():
- # Have the client upload the file, then download and
- # compare
- try:
+ if SKIP_FLIGHT in test_case.skip:
+ print('-- Skipping test')
+ continue
+
+ try:
+ with producer.flight_server():
+ # Have the client upload the file, then download and
+ # compare
consumer.flight_request(producer.FLIGHT_PORT, json_path)
- except Exception:
- traceback.print_exc()
- yield (producer, consumer, sys.exc_info())
+ except Exception:
+ traceback.print_exc()
+ yield (test_case, producer, consumer, sys.exc_info())
+ continue
class Tester(object):
@@ -1140,7 +1173,7 @@ class JavaTester(Tester):
output)
yield
finally:
- server.terminate()
+ server.kill()
server.wait(5)
@@ -1219,7 +1252,7 @@ class CPPTester(Tester):
output)
yield
finally:
- server.terminate()
+ server.kill()
server.wait(5)
def flight_request(self, port, json_path):
@@ -1295,7 +1328,9 @@ class JSTester(Tester):
def get_static_json_files():
glob_pattern = os.path.join(ARROW_HOME, 'integration', 'data', '*.json')
- return glob.glob(glob_pattern)
+ return [JsonFile(name=os.path.basename(p), path=p, skip=set(),
+ schema=None, batches=None)
+ for p in glob.glob(glob_pattern)]
def run_all_tests(run_flight=False, debug=False, tempdir=None):
@@ -1314,13 +1349,20 @@ def run_all_tests(run_flight=False, debug=False, tempdir=None):
if run_flight:
failures.extend(runner.run_flight())
+ fail_count = 0
if failures:
print("################# FAILURES #################")
- for producer, consumer, exc_info in failures:
- print("FAILED TEST:", producer.name, "producing, ",
+ for test_case, producer, consumer, exc_info in failures:
+ fail_count += 1
+ print("FAILED TEST:", end=" ")
+ print(test_case.name, producer.name, "producing, ",
consumer.name, "consuming")
- traceback.print_exception(*exc_info)
+ if exc_info:
+ traceback.print_exception(*exc_info)
print()
+
+ print(fail_count, "failures")
+ if fail_count > 0:
sys.exit(1)