You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2017/10/06 04:31:21 UTC

[1/2] incubator-impala git commit: IMPALA-5525 Extend TestScannersFuzzing to test uncompressed parquet

Repository: incubator-impala
Updated Branches:
  refs/heads/master c14a09040 -> ec957456d


IMPALA-5525 Extend TestScannersFuzzing to test uncompressed parquet

test_scanners_fuzz.py currently tests compressed parquet but
does not test uncompressed parquet. This fix adds a new test
case for uncompressed parquet.

Testing
-------
Ran the query_test/test_scanners_fuzz.py in a loop (5 times)
and there was no impalad crash seen.

Change-Id: I760de7203a51cf82b16016fa8043cadc7c8325bc
Reviewed-on: http://gerrit.cloudera.org:8080/8056
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/d40047aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/d40047aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/d40047aa

Branch: refs/heads/master
Commit: d40047aa9b6aff6ef33c10ce5f05ebdac9b37e63
Parents: c14a090
Author: Pranay <ps...@cloudera.com>
Authored: Wed Sep 13 09:54:11 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 00:32:17 2017 +0000

----------------------------------------------------------------------
 tests/query_test/test_scanners_fuzz.py | 66 ++++++++++++++++++++++-------
 1 file changed, 50 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d40047aa/tests/query_test/test_scanners_fuzz.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners_fuzz.py b/tests/query_test/test_scanners_fuzz.py
index 53fe348..c336a17 100644
--- a/tests/query_test/test_scanners_fuzz.py
+++ b/tests/query_test/test_scanners_fuzz.py
@@ -28,6 +28,7 @@ from subprocess import check_call
 from tests.common.test_dimensions import create_exec_option_dimension_from_dict
 from tests.common.impala_test_suite import ImpalaTestSuite, LOG
 from tests.util.filesystem_utils import WAREHOUSE, get_fs_path
+from tests.util.test_file_parser import QueryTestSectionReader
 
 # Random fuzz testing of HDFS scanners. Existing tables for any HDFS file format
 # are corrupted in random ways to flush out bugs with handling of corrupted data.
@@ -67,7 +68,10 @@ class TestScannersFuzzing(ImpalaTestSuite):
 
 
   def test_fuzz_alltypes(self, vector, unique_database):
-    self.run_fuzz_test(vector, unique_database, "alltypes")
+    table_format = vector.get_value('table_format')
+    src_db = QueryTestSectionReader.get_db_name(table_format)
+    table_name = "alltypes"
+    self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name)
 
   def test_fuzz_decimal_tbl(self, vector, unique_database):
     table_format = vector.get_value('table_format')
@@ -82,16 +86,46 @@ class TestScannersFuzzing(ImpalaTestSuite):
       # decimal_tbl is not present for these file formats
       pytest.skip()
 
-    self.run_fuzz_test(vector, unique_database, table_name, 10)
+    src_db = QueryTestSectionReader.get_db_name(table_format)
+    self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name, 10)
 
   def test_fuzz_nested_types(self, vector, unique_database):
     table_format = vector.get_value('table_format')
+    table_name = "complextypestbl"
+    src_db = QueryTestSectionReader.get_db_name(table_format)
+
+    if table_format.file_format != 'parquet': pytest.skip()
+    self.run_fuzz_test(vector, src_db, table_name, unique_database, table_name, 10)
+
+  def test_fuzz_uncompressed_parquet(self, vector, unique_database):
+    """Parquet tables in default schema are compressed, so in order
+       to do the fuzz_test on an uncompressed parquet table, this test
+       clones from an existing parquet table into a new table with
+       no compression.
+    """
+    table_format = vector.get_value('table_format')
+    if vector.get_value('table_format').compression_codec != 'none': pytest.skip()
     if table_format.file_format != 'parquet': pytest.skip()
-    self.run_fuzz_test(vector, unique_database, "complextypestbl", 10)
+
+    """Even when the compression_codec is none, the default compression type is snappy
+       so compression codec is changed explicitly to be none.
+    """
+    self.execute_query("set compression_codec=none")
+
+    tbl_list = ["alltypes", "decimal_tbl"]
+    for orig_tbl_name in tbl_list:
+      src_table_name = "parquet_uncomp_src_" + orig_tbl_name
+      fuzz_table_name = "parquet_uncomp_dst_" + orig_tbl_name
+      fq_tbl_name = unique_database + "." + src_table_name
+      create_tbl = ("create table {0} stored as parquet as select * from"
+          " functional_parquet.{1}".format(fq_tbl_name, orig_tbl_name))
+      self.execute_query(create_tbl)
+      self.run_fuzz_test(vector, unique_database, src_table_name, unique_database,
+          fuzz_table_name, 10)
 
   # TODO: add test coverage for additional data types like char and varchar
 
-  def run_fuzz_test(self, vector, unique_database, table, num_copies=1):
+  def run_fuzz_test(self, vector, src_db, src_table, fuzz_db, fuzz_table, num_copies=1):
     """ Do some basic fuzz testing: create a copy of an existing table with randomly
     corrupted files and make sure that we don't crash or behave in an unexpected way.
     'unique_database' is used for the table, so it will be cleaned up automatically.
@@ -106,27 +140,26 @@ class TestScannersFuzzing(ImpalaTestSuite):
     LOG.info("Using random seed %d", random_seed)
     rng.seed(long(random_seed))
 
-    table_format = vector.get_value('table_format')
-    self.change_database(self.client, table_format)
-
-    tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % table,
+    tmp_table_dir = tempfile.mkdtemp(prefix="tmp-scanner-fuzz-%s" % fuzz_table,
         dir=os.path.join(os.environ['IMPALA_HOME'], "testdata"))
 
-    self.execute_query("create table %s.%s like %s" % (unique_database, table, table))
+    self.execute_query("create table %s.%s like %s.%s" % (fuzz_db, fuzz_table,
+        src_db, src_table))
     fuzz_table_location = get_fs_path("/test-warehouse/{0}.db/{1}".format(
-        unique_database, table))
+        fuzz_db, fuzz_table))
 
     LOG.info("Generating corrupted version of %s in %s. Local working directory is %s",
-        table, unique_database, tmp_table_dir)
+        fuzz_table, fuzz_db, tmp_table_dir)
 
     # Find the location of the existing table and get the full table directory structure.
-    table_loc = self._get_table_location(table, vector)
+    fq_table_name = src_db + "." + src_table
+    table_loc = self._get_table_location(fq_table_name, vector)
     check_call(['hdfs', 'dfs', '-copyToLocal', table_loc + "/*", tmp_table_dir])
 
     partitions = self.walk_and_corrupt_table_data(tmp_table_dir, num_copies, rng)
     for partition in partitions:
       self.execute_query('alter table {0}.{1} add partition ({2})'.format(
-          unique_database, table, ','.join(partition)))
+          fuzz_db, fuzz_table, ','.join(partition)))
 
     # Copy all of the local files and directories to hdfs.
     to_copy = ["%s/%s" % (tmp_table_dir, file_or_dir)
@@ -137,14 +170,14 @@ class TestScannersFuzzing(ImpalaTestSuite):
       shutil.rmtree(tmp_table_dir)
 
     # Querying the corrupted files should not DCHECK or crash.
-    self.execute_query("refresh %s.%s" % (unique_database, table))
+    self.execute_query("refresh %s.%s" % (fuzz_db, fuzz_table))
     # Execute a query that tries to read all the columns and rows in the file.
     # Also execute a count(*) that materializes no columns, since different code
     # paths are exercised.
     queries = [
         'select count(*) from (select distinct * from {0}.{1}) q'.format(
-            unique_database, table),
-        'select count(*) from {0}.{1} q'.format(unique_database, table)]
+            fuzz_db, fuzz_table),
+        'select count(*) from {0}.{1} q'.format(fuzz_db, fuzz_table)]
 
     for query, batch_size, disable_codegen in \
         itertools.product(queries, self.BATCH_SIZES, self.DISABLE_CODEGEN_VALUES):
@@ -164,6 +197,7 @@ class TestScannersFuzzing(ImpalaTestSuite):
         # Parquet and compressed text can fail the query for some parse errors.
         # E.g. corrupt Parquet footer (IMPALA-3773) or a corrupt LZO index file
         # (IMPALA-4013).
+        table_format = vector.get_value('table_format')
         if table_format.file_format != 'parquet' \
             and not (table_format.file_format == 'text' and
             table_format.compression_codec != 'none'):


[2/2] incubator-impala git commit: IMPALA-6019: Remove dead code parallel-executor*

Posted by jr...@apache.org.
IMPALA-6019: Remove dead code parallel-executor*

Change-Id: I7f0b121c2c40849937afa103dfedf0e0bef34477
Reviewed-on: http://gerrit.cloudera.org:8080/8206
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/ec957456
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/ec957456
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/ec957456

Branch: refs/heads/master
Commit: ec957456d26cf0935654e8a70dbc1f36e8c1adae
Parents: d40047a
Author: Dan Hecht <dh...@cloudera.com>
Authored: Wed Oct 4 10:28:44 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Oct 6 03:02:05 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/CMakeLists.txt            |  2 -
 be/src/runtime/parallel-executor-test.cc | 82 ---------------------------
 be/src/runtime/parallel-executor.cc      | 66 ---------------------
 be/src/runtime/parallel-executor.h       | 67 ----------------------
 4 files changed, 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 9c655ec..faed531 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -49,7 +49,6 @@ add_library(Runtime
   mem-tracker.cc
   mem-pool.cc
   multi-precision.cc
-  parallel-executor.cc
   query-exec-mgr.cc
   query-state.cc
   test-env.cc
@@ -83,7 +82,6 @@ ADD_BE_TEST(string-buffer-test)
 ADD_BE_TEST(data-stream-test)
 ADD_BE_TEST(timestamp-test)
 ADD_BE_TEST(disk-io-mgr-test)
-ADD_BE_TEST(parallel-executor-test)
 ADD_BE_TEST(raw-value-test)
 ADD_BE_TEST(string-compare-test)
 ADD_BE_TEST(string-search-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor-test.cc b/be/src/runtime/parallel-executor-test.cc
deleted file mode 100644
index 8347939..0000000
--- a/be/src/runtime/parallel-executor-test.cc
+++ /dev/null
@@ -1,82 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#include <string>
-#include <boost/bind.hpp>
-
-#include "runtime/parallel-executor.h"
-#include "testutil/gtest-util.h"
-#include "util/thread.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-namespace impala {
-
-class ParallelExecutorTest {
- public:
-  Status UpdateFunction(void* value) {
-    long arg = reinterpret_cast<long>(value);
-    EXPECT_FALSE(updates_found_[arg]);
-    updates_found_[arg] = true;
-
-    double result = 0;
-    // Run something random to keep this cpu a little busy
-    for (int i = 0; i < 10000; ++i) {
-      for (int j = 0; j < 200; ++j) {
-        result += sin(i) + cos(j);
-      }
-    }
-
-    return Status::OK();
-  }
-
-  ParallelExecutorTest(int num_updates) {
-    updates_found_.resize(num_updates);
-  }
-
-  void Validate() {
-    for (int i = 0; i < updates_found_.size(); ++i) {
-      EXPECT_TRUE(updates_found_[i]);
-    }
-  }
-
- private:
-  vector<int> updates_found_;
-};
-
-TEST(ParallelExecutorTest, Basic) {
-  int num_work_items = 100;
-  ParallelExecutorTest test_caller(num_work_items);
-
-  vector<long> args;
-  for (int i = 0; i < num_work_items; ++i) {
-    args.push_back(i);
-  }
-
-  EXPECT_OK(ParallelExecutor::Exec(
-      bind<Status>(mem_fn(&ParallelExecutorTest::UpdateFunction), &test_caller, _1),
-      reinterpret_cast<void**>(args.data()), args.size()));
-
-  test_caller.Validate();
-}
-
-}
-
-IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.cc b/be/src/runtime/parallel-executor.cc
deleted file mode 100644
index f3dd708..0000000
--- a/be/src/runtime/parallel-executor.cc
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/parallel-executor.h"
-
-#include <boost/thread/thread.hpp>
-
-#include "util/stopwatch.h"
-#include "util/thread.h"
-
-#include "common/names.h"
-
-using namespace impala;
-
-Status ParallelExecutor::Exec(Function function, void** args, int num_args,
-    StatsMetric<double>* latencies) {
-  Status status;
-  ThreadGroup worker_threads;
-  mutex lock;
-
-  for (int i = 0; i < num_args; ++i) {
-    stringstream ss;
-    ss << "worker-thread(" << i << ")";
-    std::unique_ptr<Thread> t;
-    Status thread_status = Thread::Create("parallel-executor", ss.str(),
-        &ParallelExecutor::Worker, function, args[i], &lock, &status, latencies, &t);
-    if (!thread_status.ok()) {
-      unique_lock<mutex> l(lock);
-      status = thread_status;
-      break;
-    }
-    worker_threads.AddThread(move(t));
-  }
-  worker_threads.JoinAll();
-
-  return status;
-}
-
-void ParallelExecutor::Worker(Function function, void* arg, mutex* lock, Status* status,
-    StatsMetric<double>* latencies) {
-  MonotonicStopWatch sw;
-  if (latencies != NULL) sw.Start();
-  Status local_status = function(arg);
-  if (!local_status.ok()) {
-    unique_lock<mutex> l(*lock);
-    if (status->ok()) *status = local_status;
-  }
-
-  if (latencies != NULL) {
-    latencies->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/ec957456/be/src/runtime/parallel-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/parallel-executor.h b/be/src/runtime/parallel-executor.h
deleted file mode 100644
index ceb4c66..0000000
--- a/be/src/runtime/parallel-executor.h
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_RUNTIME_PARALLEL_EXECUTOR_H
-#define IMPALA_RUNTIME_PARALLEL_EXECUTOR_H
-
-#include <boost/function.hpp>
-#include <boost/thread/mutex.hpp>
-
-#include "common/status.h"
-#include "util/collection-metrics.h"
-
-namespace impala {
-
-/// This is a class that executes multiple functions in parallel with different arguments
-/// using a thread pool.
-/// TODO: look into an API for this.  Boost has one that is in review but not yet official.
-/// TODO: use a shared pool?  Thread creation is pretty cheap so this might not be
-/// worth it
-/// TODO: Consider rewriting in terms of ThreadPool
-class ParallelExecutor {
- public:
-  /// Typedef for the underlying function for the work.
-  /// The function must be thread safe.
-  /// The function must return a Status indicating if it was successful or not.
-  /// An example of how this function should be defined would be:
-  ///    static Status Foo::IssueRpc(void* arg);
-  /// TODO: there might some magical template way to do this with boost that is more
-  /// type safe.
-  typedef boost::function<Status (void* arg)> Function;
-
-  /// Calls function(args[i]) num_args times in parallel using num_args threads.
-  /// If any of the work item fails, returns the Status of the first failed work item.
-  /// Otherwise, returns Status::OK when all work items have been executed.
-  //
-  /// Callers may pass a StatsMetric to gather the latency distribution of task execution.
-  static Status Exec(Function function, void** args, int num_args,
-      StatsMetric<double>* latencies = NULL);
-
- private:
-  /// Worker thread function which calls function(arg).  This function updates
-  /// *status taking *lock to synchronize results from different threads.
-  //
-  /// If 'latencies' is not NULL, it is updated with the time elapsed while executing
-  /// 'function'.
-  static void Worker(Function function, void* arg, boost::mutex* lock, Status* status,
-      StatsMetric<double>* latencies);
-};
-
-}
-
-#endif