You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@tajo.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2015/12/01 06:49:10 UTC

[jira] [Commented] (TAJO-1952) Implement PartitionFileFragment

    [ https://issues.apache.org/jira/browse/TAJO-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033151#comment-15033151 ] 

ASF GitHub Bot commented on TAJO-1952:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/846#discussion_r46242293
  
    --- Diff: tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java ---
    @@ -550,6 +551,181 @@ public long getMinSplitSize() {
         return splits;
       }
     
    +  ////////////////////////////////////////////////////////////////////////////////
    +  // The below code is for splitting partitioned table.
    +  ////////////////////////////////////////////////////////////////////////////////
    +
    +  public PartitionFileFragment[] partitionSplit(String tableName, Path tablePath, String partitionKeys)
    +    throws IOException {
    +    return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(), partitionKeys);
    +  }
    +
    +  private PartitionFileFragment[] partitionSplit(String tableName, Path tablePath, long size, String partitionKeys)
    +    throws IOException {
    +    FileSystem fs = tablePath.getFileSystem(conf);
    +
    +    long defaultBlockSize = size;
    +    List<PartitionFileFragment> listTablets = new ArrayList<>();
    +    PartitionFileFragment tablet;
    +
    +    FileStatus[] fileLists = fs.listStatus(tablePath);
    +    for (FileStatus file : fileLists) {
    +      long remainFileSize = file.getLen();
    +      long start = 0;
    +      if (remainFileSize > defaultBlockSize) {
    +        while (remainFileSize > defaultBlockSize) {
    +          tablet = new PartitionFileFragment(tableName, file.getPath(), start, defaultBlockSize, partitionKeys);
    +          listTablets.add(tablet);
    +          start += defaultBlockSize;
    +          remainFileSize -= defaultBlockSize;
    +        }
    +        listTablets.add(new PartitionFileFragment(tableName, file.getPath(), start, remainFileSize, partitionKeys));
    +      } else {
    +        listTablets.add(new PartitionFileFragment(tableName, file.getPath(), 0, remainFileSize, partitionKeys));
    +      }
    +    }
    +
    +    PartitionFileFragment[] tablets = new PartitionFileFragment[listTablets.size()];
    +    listTablets.toArray(tablets);
    +
    +    return tablets;
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, Path file, long start, long length,
    +                                                     String[] hosts, String partitionKeys) {
    +    return new PartitionFileFragment(fragmentId, file, start, length, hosts, partitionKeys);
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, Path file, BlockLocation blockLocation
    +    , String partitionKeys) throws IOException {
    +    return new PartitionFileFragment(fragmentId, file, blockLocation, partitionKeys);
    +  }
    +
    +  protected Fragment makeNonPartitionSplit(String fragmentId, Path file, long start, long length,
    +                                           BlockLocation[] blkLocations, String partitionKeys) throws IOException {
    +
    +    Map<String, Integer> hostsBlockMap = new HashMap<>();
    +    for (BlockLocation blockLocation : blkLocations) {
    +      for (String host : blockLocation.getHosts()) {
    +        if (hostsBlockMap.containsKey(host)) {
    +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
    +        } else {
    +          hostsBlockMap.put(host, 1);
    +        }
    +      }
    +    }
    +
    +    List<Map.Entry<String, Integer>> entries = new ArrayList<>(hostsBlockMap.entrySet());
    +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
    --- End diff --
    
    It can be simplified with lambda.


> Implement PartitionFileFragment
> -------------------------------
>
>                 Key: TAJO-1952
>                 URL: https://issues.apache.org/jira/browse/TAJO-1952
>             Project: Tajo
>          Issue Type: Improvement
>          Components: Planner/Optimizer, Storage
>            Reporter: Jaehwa Jung
>            Assignee: Jaehwa Jung
>             Fix For: 0.12.0, 0.11.1
>
>         Attachments: TAJO-1952.patch
>
>
> Currently, PartitionedTableScanNode contains the list of partitions and it seems to me that the list has some problems as following:
> 1. Duplicate Informs: Task contains Fragment which specify target directory or target file for scanning. A path of partition lists already would write to Fragment. 
> 2. Network Resource: When scanning lost of partition, it will occupy network resource, for example, several hundred kilobytes or more. It looks like an unnecessary resource because Fragment already has the path of partitions.
> I want to improve above problems by implementing new Fragment called PartitionedFileFragment. Currently, I'm planning the implementation as following:
> * PartitionedFileFragment will borrow FileFragment and it contains the partition path and the partition key values.  
> * Remove the path array of partitions from PartitionedTableScanNode. 
> * Implement a method for getting filtered partition directories in FileTableSpace.
> * Implement a method for making PartitionedFileFragment array.
> * Before making splits, call above method and use it for making splits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)