You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@trafodion.apache.org by "Sandhya Sundaresan (JIRA)" <ji...@apache.org> on 2018/01/18 20:01:01 UTC

[jira] [Commented] (TRAFODION-2917) Refactor Trafodion implementation of hdfs scan for text formatted hive tables

    [ https://issues.apache.org/jira/browse/TRAFODION-2917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331099#comment-16331099 ] 

Sandhya Sundaresan commented on TRAFODION-2917:
-----------------------------------------------

Description of the current flow is below. As you can see the opens, reads, and main thread processing are highly synchronized so there is no waiting anywhere. If the new implementation can retain this level of parallel reading and yet be a lot simpler it is worth investing the time in getting rid of this code since this code is pretty difficult to debug although we have ironed out many hangs and bugs. So I think we should retain this code with a toggle  for a while since we’ve ironed out a lot of bugs. We can switch over to new code after some exposure and extensive testing. 

 

Here is how the flow goes currently :

1) main thread opens one range (64MB). This open sets up some initial structures, open hdfs file and kicks off one worker thread on its mission to start fetching, say, 16KB buffers. It does not wait for worker thread to complete the fetches. It just returns .

 

2) Main thread determines that there will be one more range to work on after the current one completes. So, it calls pre-open on the next range (ie., next 128MB, a new file). This pre-open request just puts a new pre-open object in the pre-open list. Does nothing at this point.

 

3) Main thread then issues a read. Since worker thread had already begun fetching 16KB buffers in (1), the main thread most likely will not need to wait and the data will be ready. It keeps consuming the buffers and  recycling them back into postFetchBufList.

 

4) When done, main thread closes the cursor.

 

What goes in the worker thread when it gets kicked off in (1) -

 

1) Worker thread reads, say 16KB of data and buffers them in prefetchBufList. It continues doing this until either buffer limit is reached, or range is completely fetched, or was asked prematurely to stop reading.

 

2) After the entire range is completely read, it processes the pre-open request that was queued in (2) above. Pre-open is simply another open, which kicks off second worker thread to start fetching the next range. The first worker thread then goes back to sleep. The pre-open is necessary to  avoid a delay when going from one range to another. The cost otherwise would be I/O time to open the new hdfs file, position to the correct offset within that file and then kick off the next worker thread. With pre-open, the main thread sees a continues stream of data already fetched when it does the next open.

 

So with these two parallel activities, data is always prefetched upto one range in advance, not more. This is because a new pre-open request is only scheduled at the time of new open in the main thread.

 

Only one worker thread reads the data and puts it in the prefetchBufList. This is because, the buffers in prefetchBufList are sequentially ordered by the offset of the 16KB data read from hdfs file. It cannot have out of order buffers because the main thread assumes the next buffer it reads begins at where the previous buffer ends. The second worker thread is needed so that when first one processes a pre-open, it can kick off second one to start prefetching.

 

Worker threads go away when shutdown request is scheduled. This is in LobGlobals destructor which gets called as part of TCB destructor.

> Refactor Trafodion implementation of hdfs scan for text formatted hive tables
> -----------------------------------------------------------------------------
>
>                 Key: TRAFODION-2917
>                 URL: https://issues.apache.org/jira/browse/TRAFODION-2917
>             Project: Apache Trafodion
>          Issue Type: New Feature
>          Components: sql-general
>            Reporter: Selvaganesan Govindarajan
>            Priority: Major
>             Fix For: 2.3
>
>
> Find below the general outline of hdfs scan for text formatted hive tables.
> Compiler returns a list of scan ranges and the begin range and number of ranges to be done by each instance of TCB in TDB. This list of scan ranges is also re-computed at run time possibly based on a CQD
> The scan range for a TCB can come from the same or different hdfs files.  TCB creates two threads to read these ranges.Two ranges (for the TCB) are initially assigned to these threads. As and when a range is completed, the next range (assigned for the TCB) is picked up by the thread. Ranges are read in multiples of hdfs scan buffer size at the TCB level. Default hdfs scan buffer size is 64 MB. Rows from hdfs scan buffer is processed and moved into up queue. If the range contains a record split, then the range is extended to read up to range tail IO size to get the full row. The range that had the latter part of the row ignores it because the former range processes it. Record split at the file level is not possible and/or not supported.
>  For compression, the compiler returns the range info such that the hdfs scan buffer can hold the full uncompressed buffer.
>  Cons:
> Reader threads feature too complex to maintain in C++
> Error handling at the layer below the TCB is missing or errors are not propagated to work method causing incorrect results
> Possible multiple copying of data
> Libhdfs calls are not optimized. It was observed that the method Ids are being obtained many times. Need to check if this problem still exists.
> Now that we clearly know what is expected, it could be optimized better
>   - Reduced scan buffer size for smoother data flow
>   - Better thread utilization
>   - Avoid multiple copying of data.
> Unable to comprehend the need for two threads for pre-fetch especially when one range is completed fully before the data from next range is processed.
>  Following are the hdfsCalls used by programs at exp and executor directory.
>                   U hdfsCloseFile
>                  U hdfsConnect
>                  U hdfsDelete
>                  U hdfsExists
>                  U hdfsFlush
>                  U hdfsFreeFileInfo
>                  U hdfsGetPathInfo
>                  U hdfsListDirectory
>                  U hdfsOpenFile
>                  U hdfsPread
>                  U hdfsRename
>                  U hdfsWrite
>                  U hdfsCreateDirectory
>  New implementation
>  Make changes to use direct Java APIs for these calls. However, come up with better mechanism to move the data from Java and JNI, avoid unnecessary copying of data, better thread management via Executor concepts in Java. Hence it won’t be direct mapping of these calls to hdfs Java API. Instead, use the abstraction like what is being done for HBase access.
>  I believe newer implementation will be optimized better and hence improved performance. (but not many folds)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)