You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Pooja Nilangekar (JIRA)" <ji...@apache.org> on 2018/08/23 18:15:00 UTC

[jira] [Comment Edited] (IMPALA-7335) Assertion Failure - test_corrupt_files

    [ https://issues.apache.org/jira/browse/IMPALA-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589578#comment-16589578 ] 

Pooja Nilangekar edited comment on IMPALA-7335 at 8/23/18 6:14 PM:
-------------------------------------------------------------------

From my understanding of the HdfsScanNode, here are the set of transitions/dependencies among the state variables:
 # HdfsScanNode::thread_state_::batch_queue_::shutdown_ is only modified once (false -> true) in the HdfsScanNode::SetDoneInternal(). When the value is changed, it is guaranteed that done_ is set to true and reader_context_ is cancelled. It is read in GetBatch() and AddMaterializedRowBatch() functions. *It acts as a point of synchronization for GetNext() since it invokes GetNextInternal() which invokes GetBatch(). If the batch returned is NULL, the status_ is returned to the FragmentInstanceState.* 
 # HdfsScanNodeBase::progress_ is initialized in HdfsScanNode::Open() and decremented each time HdfsScanner::Close() is called. It is only decremented after initialization. *When ScannerThread() or GetNext() functions view its value as 0**, they invoke SetDone() which transitions the done_ flag, cancels all ranges in the reader_context_ and sets the shutdown_ flag to true.*  It is used in ThreadTokenAvailableCb to avoid spawning excessive threads, however this doesn't lead to any non-trivial failure. The extra thread spawned can simply terminate.
 # HdfsScanNode::status_ It transitions in two places. ThreadTokenAvailableCb() [when thread creation fails] and ScannerThread() [when ProcessSplit() returns an error]. *In both cases, modification of status_ is followed by a call to SetDoneInternal().*
 # HdfsScanNode::done_ It transitions only via SetDoneInternal.() However, it is used by multiple threads to signal the completion of the Scan. (It can be due to completing all ranges, encountering a non-recoverable error or reaching the scan node's limit).

Here are the dependencies among the transitions.

progress_  --> done_ --> shutdown_ & status_ --> done_ --> shutdown_

Additionally, progress_ -> status_ is a dependency which is not deterministically followed. One approach to fix the race would be to update the status_ and invoke SetDoneInternal() in HdfsScanNode::ProcessSplit() before invoking scanner>Close() in case of failures (a similar flow was a part of the code before IMPALA-2667). This will ensure that the first non-ok status always updates the status_ before done_ and shutdown_ flags are set.


was (Author: poojanilangekar):
From my understanding of the HdfsScanNode, here are the set of transitions/dependencies among the state variables:
 # HdfsScanNode::thread_state_::batch_queue_::shutdown_ is only modified once (false -> true) in the HdfsScanNode::SetDoneInternal(). When the value is changed, it is guaranteed that done_ is set to true and reader_context_ is cancelled. It is read in GetBatch() and AddMaterializedRowBatch() functions. *It acts as a point of synchronization for GetNext() since it invokes GetNextInternal() which invokes GetBatch(). If the batch returned is NULL, the status_ is returned to the FragmentInstanceState.* 
 # HdfsScanNodeBase::progress_ is initialized in HdfsScanNode::Open() and decremented each time HdfsScanner::Close() is called. It is only decremented after initialization. *When ScannerThread() or GetNext() functions view its value as 0**, they invoke SetDone() which transitions the done_ flag, cancels all ranges in the reader_context_ and sets the shutdown_ flag to true.*  It is used in ThreadTokenAvailableCb to avoid spawning excessive threads, however this doesn't lead to any non-trivial failure. The extra thread spawned can simply terminate.
 # HdfsScanNode::status_ It transitions in two places. ThreadTokenAvailableCb() [when thread creation fails] and ScannerThread() [when ProcessSplit() returns an error]. *In both cases, modification of status_ is followed by a call to SetDoneInternal().* 
 # HdfsScanNode::done_ It transitions only via SetDoneInternal.() However, it is used by multiple threads to signal the completion of the Scan. (It can be due to completing all ranges, encountering a non-recoverable error or reaching the scan node's limit).

Here are the dependencies among the transitions.

progress_  --> done_ --> shutdown_ & status_ --> done_ --> shutdown_

Additionally, progress_ --> status_ is a dependency which is not deterministically followed. One approach to fix the race would be to update the status_ and invoke SetDoneInternal() in HdfsScanNode::ProcessSplit() before invoking scanner->Close() in case of failures (a similar flow was a part of the code before IMPALA-2667). This will ensure that the first non-ok status always updates the status_ before done_ and shutdown_ flags are set.

> Assertion Failure - test_corrupt_files
> --------------------------------------
>
>                 Key: IMPALA-7335
>                 URL: https://issues.apache.org/jira/browse/IMPALA-7335
>             Project: IMPALA
>          Issue Type: Bug
>    Affects Versions: Impala 3.1.0
>            Reporter: nithya
>            Assignee: Pooja Nilangekar
>            Priority: Blocker
>              Labels: broken-build
>
> test_corrupt_files fails 
>  
> query_test.test_scanners.TestParquet.test_corrupt_files[exec_option: \\{'batch_size': 0, 'num_nodes': 0, 'disable_codegen_rows_threshold': 0, 'disable_codegen': False, 'abort_on_error': 1, 'debug_action': None, 'exec_single_node_rows_threshold': 0} | table_format: parquet/none] (from pytest)
>  
> {code:java}
> Error Message
> query_test/test_scanners.py:300: in test_corrupt_files     self.run_test_case('QueryTest/parquet-abort-on-error', vector) common/impala_test_suite.py:420: in run_test_case     assert False, "Expected exception: %s" % expected_str E   AssertionError: Expected exception: Column metadata states there are 11 values, but read 10 values from column id.
> STACKTRACE
> query_test/test_scanners.py:300: in test_corrupt_files
>     self.run_test_case('QueryTest/parquet-abort-on-error', vector)
> common/impala_test_suite.py:420: in run_test_case
>     assert False, "Expected exception: %s" % expected_str
> E   AssertionError: Expected exception: Column metadata states there are 11 values, but read 10 values from column id.
> Standard Error
> -- executing against localhost:21000
> use functional_parquet;
> SET batch_size=0;
> SET num_nodes=0;
> SET disable_codegen_rows_threshold=0;
> SET disable_codegen=False;
> SET abort_on_error=0;
> SET exec_single_node_rows_threshold=0;
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id, cnt from bad_column_metadata t, (select count(*) cnt from t.int_array) v;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id from bad_column_metadata;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> SELECT * from bad_parquet_strings_negative_len;
> -- executing against localhost:21000
> SELECT * from bad_parquet_strings_out_of_bounds;
> -- executing against localhost:21000
> use functional_parquet;
> SET batch_size=0;
> SET num_nodes=0;
> SET disable_codegen_rows_threshold=0;
> SET disable_codegen=False;
> SET abort_on_error=1;
> SET exec_single_node_rows_threshold=0;
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id, cnt from bad_column_metadata t, (select count(*) cnt from t.int_array) v;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id from bad_column_metadata;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org