You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2014/09/19 07:56:57 UTC

svn commit: r1626128 - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/io/orc/ shims/0.20/src/main/java/org/apache/hadoop/hive/shims/ shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/ shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ s...

Author: gopalv
Date: Fri Sep 19 05:56:56 2014
New Revision: 1626128

URL: http://svn.apache.org/r1626128
Log:
HIVE-8038: Decouple ORC files split calculation logic from fixed-size block assumptions. (Pankit Thapar via Gopal V)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
    hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
    hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
    hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Fri Sep 19 05:56:56 2014
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.NavigableMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -610,7 +612,7 @@ public class OrcInputFormat  implements 
     private final FileSystem fs;
     private final FileStatus file;
     private final long blockSize;
-    private final BlockLocation[] locations;
+    private final TreeMap<Long, BlockLocation> locations;
     private final FileInfo fileInfo;
     private List<StripeInformation> stripes;
     private ReaderImpl.FileMetaInfo fileMetaInfo;
@@ -630,7 +632,7 @@ public class OrcInputFormat  implements 
       this.file = file;
       this.blockSize = file.getBlockSize();
       this.fileInfo = fileInfo;
-      locations = SHIMS.getLocations(fs, file);
+      locations = SHIMS.getLocationsWithOffset(fs, file);
       this.isOriginal = isOriginal;
       this.deltas = deltas;
       this.hasBase = hasBase;
@@ -641,8 +643,8 @@ public class OrcInputFormat  implements 
     }
 
     void schedule() throws IOException {
-      if(locations.length == 1 && file.getLen() < context.maxSize) {
-        String[] hosts = locations[0].getHosts();
+      if(locations.size() == 1 && file.getLen() < context.maxSize) {
+        String[] hosts = locations.firstEntry().getValue().getHosts();
         synchronized (context.splits) {
           context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
                 hosts, fileMetaInfo, isOriginal, hasBase, deltas));
@@ -690,15 +692,22 @@ public class OrcInputFormat  implements 
     void createSplit(long offset, long length,
                      ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException {
       String[] hosts;
-      if ((offset % blockSize) + length <= blockSize) {
+      Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset);
+      BlockLocation start = startEntry.getValue();
+      if (offset + length <= start.getOffset() + start.getLength()) {
         // handle the single block case
-        hosts = locations[(int) (offset / blockSize)].getHosts();
+        hosts = start.getHosts();
       } else {
+        Map.Entry<Long, BlockLocation> endEntry = locations.floorEntry(offset + length);
+        BlockLocation end = endEntry.getValue();
+        //get the submap
+        NavigableMap<Long, BlockLocation> navigableMap = locations.subMap(startEntry.getKey(),
+                  true, endEntry.getKey(), true);
         // Calculate the number of bytes in the split that are local to each
         // host.
         Map<String, LongWritable> sizes = new HashMap<String, LongWritable>();
         long maxSize = 0;
-        for(BlockLocation block: locations) {
+        for (BlockLocation block : navigableMap.values()) {
           long overlap = getOverlap(offset, length, block.getOffset(),
               block.getLength());
           if (overlap > 0) {
@@ -711,6 +720,9 @@ public class OrcInputFormat  implements 
               val.set(val.get() + overlap);
               maxSize = Math.max(maxSize, val.get());
             }
+          } else {
+            throw new IOException("File " + file.getPath().toString() +
+                    " should have had overlap on block starting at " + block.getOffset());
           }
         }
         // filter the list of locations to those that have at least 80% of the
@@ -718,7 +730,7 @@ public class OrcInputFormat  implements 
         long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION);
         List<String> hostList = new ArrayList<String>();
         // build the locations in a predictable order to simplify testing
-        for(BlockLocation block: locations) {
+        for(BlockLocation block: navigableMap.values()) {
           for(String host: block.getHosts()) {
             if (sizes.containsKey(host)) {
               if (sizes.get(host).get() >= threshold) {

Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Fri Sep 19 05:56:56 2014
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
 
 import javax.security.auth.Subject;
 import javax.security.auth.login.LoginException;
@@ -652,6 +653,17 @@ public class Hadoop20Shims implements Ha
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.sync();
   }

Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Fri Sep 19 05:56:56 2014
@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -403,6 +404,17 @@ public class Hadoop20SShims extends Hado
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.sync();
   }

Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Sep 19 05:56:56 2014
@@ -29,6 +29,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -511,6 +512,17 @@ public class Hadoop23Shims extends Hadoo
   }
 
   @Override
+  public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+                                                             FileStatus status) throws IOException {
+    TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+    BlockLocation[] locations = getLocations(fs, status);
+    for (BlockLocation location : locations) {
+      offsetBlockMap.put(location.getOffset(), location);
+    }
+    return offsetBlockMap;
+  }
+
+  @Override
   public void hflush(FSDataOutputStream stream) throws IOException {
     stream.hflush();
   }

Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Sep 19 05:56:56 2014
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 
 import javax.security.auth.login.LoginException;
 
@@ -477,6 +478,19 @@ public interface HadoopShims {
       FileStatus status) throws IOException;
 
   /**
+   * For the block locations returned by getLocations() convert them into a Treemap
+   * <Offset,blockLocation> by iterating over the list of blockLocation.
+   * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular
+   * block based upon offset.
+   * @param fs the file system
+   * @param status the file information
+   * @return TreeMap<Long, BlockLocation>
+   * @throws IOException
+   */
+  TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+      FileStatus status) throws IOException;
+
+  /**
    * Flush and make visible to other users the changes to the given stream.
    * @param stream the stream to hflush.
    * @throws IOException