You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2017/10/06 04:31:21 UTC
[1/2] incubator-impala git commit: IMPALA-5525 Extend
TestScannersFuzzing to test uncompressed parquet
Repository: incubator-impala
Updated Branches:
refs/heads/master c14a09040 -> ec957456d
IMPALA-5525 Extend TestScannersFuzzing to test uncompressed parquet
test_scanners_fuzz.py currently tests compressed parquet but
does not test uncompressed parquet. This fix adds a new test
case for uncompressed parquet.
Testing
-------
Ran the query_test/test_scanners_fuzz.py in a loop (5 times)
and there was no impalad crash seen.
Change-Id: I760de7203a51cf82b16016fa8043cadc7c8325bc
Reviewed-on: http://gerrit.cloudera.org:8080/8056
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d40047aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d40047aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d40047aa
Branch: refs/heads/master
Commit: d40047aa9b6aff6ef33c10ce5f05ebdac9b37e63
Parents: c14a090
Author: Pranay <ps...@cloudera.com>
Authored: Wed Sep 13 09:54:11 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 00:32:17 2017 +0000
----------------------------------------------------------------------
tests/query_test/test_scanners_fuzz.py | 66 ++++++++++++++++++++++-------
1 file changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d40047aa/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 53fe348..c336a17 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -28,6 +28,7 @@ from subprocess import check_call
from tests.common.test_dimensions import create_exec_option_dimension_from_dict
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+from tests.util.test_file_parser import QueryTestSectionReader
# Random fuzz testing of HDFS scanners. Existing tables for any HDFS file format
# are corrupted in random ways to flush out bugs with handling of corrupted data.
@@ -67,7 +68,10 @@ class TestScannersFuzzing(ImpalaTestSuite):
def test_fuzz_alltypes(self, vector, unique_database):
- self.run_fuzz_test(vector, unique_database, "alltypes")
+ table_format = vector.get_value('table_format')
+ src_db = QueryTestSectionReader.get_db_name(table_format)
+ table_name = "alltypes"
+ self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name)
def test_fuzz_decimal_tbl(self, vector, unique_database):
table_format = vector.get_value('table_format')
@@ -82,16 +86,46 @@ class TestScannersFuzzing(ImpalaTestSuite):
# decimal_tbl is not present for these file formats
pytest.skip()
- self.run_fuzz_test(vector, unique_database, table_name, 10)
+ src_db = QueryTestSectionReader.get_db_name(table_format)
+ self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name, 10)
def test_fuzz_nested_types(self, vector, unique_database):
table_format = vector.get_value('table_format')
+ table_name = "complextypestbl"
+ src_db = QueryTestSectionReader.get_db_name(table_format)
+
+ if table_format.file_format != 'parquet': pytest.skip()
+ self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name, 10)
+
+ def test_fuzz_uncompressed_parquet(self, vector, unique_database):
+ """Parquet tables in default schema are compressed, so in order
+ to do the fuzz_test on an uncompressed parquet table, this test
+ clones from an existing parquet table into a new table with
+ no compression.
+ """
+ table_format = vector.get_value('table_format')
+ if vector.get_value('table_format').compression_codec != 'none': pytest.skip()
if table_format.file_format != 'parquet': pytest.skip()
- self.run_fuzz_test(vector, unique_database, "complextypestbl", 10)
+
+ """Even when the compression_codec is none, the default compression type is snappy
+ so compression codec is changed explicitly to be none.
+ """
+ self.execute_query("set compression_codec=none")
+
+ tbl_list = ["alltypes", "decimal_tbl"]
+ for orig_tbl_name in tbl_list:
+ src_table_name = "parquet_uncomp_src_" + orig_tbl_name
+ fuzz_table_name = "parquet_uncomp_dst_" + orig_tbl_name
+ fq_tbl_name = unique_database + "." + src_table_name
+ create_tbl = ("create table {0} stored as parquet as select * from"
+ " functional_parquet.{1}".format(fq_tbl_name, orig_tbl_name))
+ self.execute_query(create_tbl)
+ self.run_fuzz_test(vector, unique_database, src_table_name, unique_database,
+ fuzz_table_name, 10)
# TODO: add test coverage for additional data types like char and varchar
- def run_fuzz_test(self, vector, unique_database, table, num_copies=1):
+ def run_fuzz_test(self, vector, src_db, src_table, fuzz_db, fuzz_table, num_copies=1):
""" Do some basic fuzz testing: create a copy of an existing table with randomly
corrupted files and make sure that we don't crash or behave in an unexpected way.
'unique_database' is used for the table, so it will be cleaned up automatically.
@@ -106,27 +140,26 @@ class TestScannersFuzzing(ImpalaTestSuite):
LOG.info("Using random seed %d", random_seed)
rng.seed(long(random_seed))
- table_format = vector.get_value('table_format')
- self.change_database(self.client, table_format)
-
- tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % table,
+ tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % fuzz_table,
dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
- self.execute_query("create table %s.%s like %s" % (unique_database, table, table))
+ self.execute_query("create table %s.%s like %s.%s" % (fuzz_db, fuzz_table,
+ src_db, src_table))
fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
- unique_database, table))
+ fuzz_db, fuzz_table))
LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
- table, unique_database, tmp_table_dir)
+ fuzz_table, fuzz_db, tmp_table_dir)
# Find the location of the existing table and get the full table directory structure.
- table_loc = self._get_table_location(table, vector)
+ fq_table_name = src_db + "." + src_table
+ table_loc = self._get_table_location(fq_table_name, vector)
check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
for partition in partitions:
self.execute_query('alter table {0}.{1} add partition ({2})'.format(
- unique_database, table, ','.join(partition)))
+ fuzz_db, fuzz_table, ','.join(partition)))
# Copy all of the local files and directories to hdfs.
to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
@@ -137,14 +170,14 @@ class TestScannersFuzzing(ImpalaTestSuite):
shutil.rmtree(tmp_table_dir)
# Querying the corrupted files should not DCHECK or crash.
- self.execute_query("refresh %s.%s" % (unique_database, table))
+ self.execute_query("refresh %s.%s" % (fuzz_db, fuzz_table))
# Execute a query that tries to read all the columns and rows in the file.
# Also execute a count(*) that materializes no columns, since different code
# paths are exercised.
queries = [
'select count(*) from (select distinct * from {0}.{1}) q'.format(
- unique_database, table),
- 'select count(*) from {0}.{1} q'.format(unique_database, table)]
+ fuzz_db, fuzz_table),
+ 'select count(*) from {0}.{1} q'.format(fuzz_db, fuzz_table)]
for query, batch_size, disable_codegen in \
itertools.product(queries, self.BATCH_SIZES, self.DISABLE_CODEGEN_VALUES):
@@ -164,6 +197,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
# Parquet and compressed text can fail the query for some parse errors.
# E.g. corrupt Parquet footer (IMPALA-3773) or a corrupt LZO index file
# (IMPALA-4013).
+ table_format = vector.get_value('table_format')
if table_format.file_format != 'parquet' \
and not (table_format.file_format == 'text' and
table_format.compression_codec != 'none'):
[2/2] incubator-impala git commit: IMPALA-6019: Remove dead code
parallel-executor*
Posted by jr...@apache.org.
IMPALA-6019: Remove dead code parallel-executor*
Change-Id: I7f0b121c2c40849937afa103dfedf0e0bef34477
Reviewed-on: http://gerrit.cloudera.org:8080/8206
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ec957456
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ec957456
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ec957456
Branch: refs/heads/master
Commit: ec957456d26cf0935654e8a70dbc1f36e8c1adae
Parents: d40047a
Author: Dan Hecht <dh...@cloudera.com>
Authored: Wed Oct 4 10:28:44 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 03:02:05 2017 +0000
----------------------------------------------------------------------
be/src/runtime/CMakeLists.txt | 2 -
be/src/runtime/parallel-executor-test.cc | 82 ---------------------------
be/src/runtime/parallel-executor.cc | 66 ---------------------
be/src/runtime/parallel-executor.h | 67 ----------------------
4 files changed, 217 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 9c655ec..faed531 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -49,7 +49,6 @@ add_library(Runtime
mem-tracker.cc
mem-pool.cc
multi-precision.cc
- parallel-executor.cc
query-exec-mgr.cc
query-state.cc
test-env.cc
@@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test)
ADD_BE_TEST(data-stream-test)
ADD_BE_TEST(timestamp-test)
ADD_BE_TEST(disk-io-mgr-test)
-ADD_BE_TEST(parallel-executor-test)
ADD_BE_TEST(raw-value-test)
ADD_BE_TEST(string-compare-test)
ADD_BE_TEST(string-search-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor-test.cc b/be/src/runtime/parallel-executor-test.cc
deleted file mode 100644
index 8347939..0000000
--- a/be/src/runtime/parallel-executor-test.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#include <string>
-#include <boost/bind.hpp>
-
-#include "runtime/parallel-executor.h"
-#include "testutil/gtest-util.h"
-#include "util/thread.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-namespace impala {
-
-class ParallelExecutorTest {
- public:
- Status UpdateFunction(void* value) {
- long arg = reinterpret_cast<long>(value);
- EXPECT_FALSE(updates_found_[arg]);
- updates_found_[arg] = true;
-
- double result = 0;
- // Run something random to keep this cpu a little busy
- for (int i = 0; i < 10000; ++i) {
- for (int j = 0; j < 200; ++j) {
- result += sin(i) + cos(j);
- }
- }
-
- return Status::OK();
- }
-
- ParallelExecutorTest(int num_updates) {
- updates_found_.resize(num_updates);
- }
-
- void Validate() {
- for (int i = 0; i < updates_found_.size(); ++i) {
- EXPECT_TRUE(updates_found_[i]);
- }
- }
-
- private:
- vector<int> updates_found_;
-};
-
-TEST(ParallelExecutorTest, Basic) {
- int num_work_items = 100;
- ParallelExecutorTest test_caller(num_work_items);
-
- vector<long> args;
- for (int i = 0; i < num_work_items; ++i) {
- args.push_back(i);
- }
-
- EXPECT_OK(ParallelExecutor::Exec(
- bind<Status>(mem_fn(&ParallelExecutorTest::UpdateFunction), &test_caller, _1),
- reinterpret_cast<void**>(args.data()), args.size()));
-
- test_caller.Validate();
-}
-
-}
-
-IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc
deleted file mode 100644
index f3dd708..0000000
--- a/be/src/runtime/parallel-executor.cc
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/parallel-executor.h"
-
-#include <boost/thread/thread.hpp>
-
-#include "util/stopwatch.h"
-#include "util/thread.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-Status ParallelExecutor::Exec(Function function, void** args, int num_args,
- StatsMetric<double>* latencies) {
- Status status;
- ThreadGroup worker_threads;
- mutex lock;
-
- for (int i = 0; i < num_args; ++i) {
- stringstream ss;
- ss << "worker-thread(" << i << ")";
- std::unique_ptr<Thread> t;
- Status thread_status = Thread::Create("parallel-executor", ss.str(),
- &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies, &t);
- if (!thread_status.ok()) {
- unique_lock<mutex> l(lock);
- status = thread_status;
- break;
- }
- worker_threads.AddThread(move(t));
- }
- worker_threads.JoinAll();
-
- return status;
-}
-
-void ParallelExecutor::Worker(Function function, void* arg, mutex* lock, Status* status,
- StatsMetric<double>* latencies) {
- MonotonicStopWatch sw;
- if (latencies != NULL) sw.Start();
- Status local_status = function(arg);
- if (!local_status.ok()) {
- unique_lock<mutex> l(*lock);
- if (status->ok()) *status = local_status;
- }
-
- if (latencies != NULL) {
- latencies->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.h b/be/src/runtime/parallel-executor.h
deleted file mode 100644
index ceb4c66..0000000
--- a/be/src/runtime/parallel-executor.h
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_PARALLEL_EXECUTOR_H
-#define IMPALA_RUNTIME_PARALLEL_EXECUTOR_H
-
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/status.h"
-#include "util/collection-metrics.h"
-
-namespace impala {
-
-/// This is a class that executes multiple functions in parallel with different arguments
-/// using a thread pool.
-/// TODO: look into an API for this. Boost has one that is in review but not yet official.
-/// TODO: use a shared pool? Thread creation is pretty cheap so this might not be
-/// worth it
-/// TODO: Consider rewriting in terms of ThreadPool
-class ParallelExecutor {
- public:
- /// Typedef for the underlying function for the work.
- /// The function must be thread safe.
- /// The function must return a Status indicating if it was successful or not.
- /// An example of how this function should be defined would be:
- /// static Status Foo::IssueRpc(void* arg);
- /// TODO: there might some magical template way to do this with boost that is more
- /// type safe.
- typedef boost::function<Status (void* arg)> Function;
-
- /// Calls function(args[i]) num_args times in parallel using num_args threads.
- /// If any of the work item fails, returns the Status of the first failed work item.
- /// Otherwise, returns Status::OK when all work items have been executed.
- //
- /// Callers may pass a StatsMetric to gather the latency distribution of task execution.
- static Status Exec(Function function, void** args, int num_args,
- StatsMetric<double>* latencies = NULL);
-
- private:
- /// Worker thread function which calls function(arg). This function updates
- /// *status taking *lock to synchronize results from different threads.
- //
- /// If 'latencies' is not NULL, it is updated with the time elapsed while executing
- /// 'function'.
- static void Worker(Function function, void* arg, boost::mutex* lock, Status* status,
- StatsMetric<double>* latencies);
-};
-
-}
-
-#endif