You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by go...@apache.org on 2014/09/19 07:56:57 UTC
svn commit: r1626128 - in /hive/trunk:
ql/src/java/org/apache/hadoop/hive/ql/io/orc/
shims/0.20/src/main/java/org/apache/hadoop/hive/shims/
shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/
shims/0.23/src/main/java/org/apache/hadoop/hive/shims/ s...
Author: gopalv
Date: Fri Sep 19 05:56:56 2014
New Revision: 1626128
URL: http://svn.apache.org/r1626128
Log:
HIVE-8038: Decouple ORC files split calculation logic from fixed-size block assumptions. (Pankit Thapar via Gopal V)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Fri Sep 19 05:56:56 2014
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
+import java.util.NavigableMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -610,7 +612,7 @@ public class OrcInputFormat implements
private final FileSystem fs;
private final FileStatus file;
private final long blockSize;
- private final BlockLocation[] locations;
+ private final TreeMap<Long, BlockLocation> locations;
private final FileInfo fileInfo;
private List<StripeInformation> stripes;
private ReaderImpl.FileMetaInfo fileMetaInfo;
@@ -630,7 +632,7 @@ public class OrcInputFormat implements
this.file = file;
this.blockSize = file.getBlockSize();
this.fileInfo = fileInfo;
- locations = SHIMS.getLocations(fs, file);
+ locations = SHIMS.getLocationsWithOffset(fs, file);
this.isOriginal = isOriginal;
this.deltas = deltas;
this.hasBase = hasBase;
@@ -641,8 +643,8 @@ public class OrcInputFormat implements
}
void schedule() throws IOException {
- if(locations.length == 1 && file.getLen() < context.maxSize) {
- String[] hosts = locations[0].getHosts();
+ if(locations.size() == 1 && file.getLen() < context.maxSize) {
+ String[] hosts = locations.firstEntry().getValue().getHosts();
synchronized (context.splits) {
context.splits.add(new OrcSplit(file.getPath(), 0, file.getLen(),
hosts, fileMetaInfo, isOriginal, hasBase, deltas));
@@ -690,15 +692,22 @@ public class OrcInputFormat implements
void createSplit(long offset, long length,
ReaderImpl.FileMetaInfo fileMetaInfo) throws IOException {
String[] hosts;
- if ((offset % blockSize) + length <= blockSize) {
+ Map.Entry<Long, BlockLocation> startEntry = locations.floorEntry(offset);
+ BlockLocation start = startEntry.getValue();
+ if (offset + length <= start.getOffset() + start.getLength()) {
// handle the single block case
- hosts = locations[(int) (offset / blockSize)].getHosts();
+ hosts = start.getHosts();
} else {
+ Map.Entry<Long, BlockLocation> endEntry = locations.floorEntry(offset + length);
+ BlockLocation end = endEntry.getValue();
+ //get the submap
+ NavigableMap<Long, BlockLocation> navigableMap = locations.subMap(startEntry.getKey(),
+ true, endEntry.getKey(), true);
// Calculate the number of bytes in the split that are local to each
// host.
Map<String, LongWritable> sizes = new HashMap<String, LongWritable>();
long maxSize = 0;
- for(BlockLocation block: locations) {
+ for (BlockLocation block : navigableMap.values()) {
long overlap = getOverlap(offset, length, block.getOffset(),
block.getLength());
if (overlap > 0) {
@@ -711,6 +720,9 @@ public class OrcInputFormat implements
val.set(val.get() + overlap);
maxSize = Math.max(maxSize, val.get());
}
+ } else {
+ throw new IOException("File " + file.getPath().toString() +
+ " should have had overlap on block starting at " + block.getOffset());
}
}
// filter the list of locations to those that have at least 80% of the
@@ -718,7 +730,7 @@ public class OrcInputFormat implements
long threshold = (long) (maxSize * MIN_INCLUDED_LOCATION);
List<String> hostList = new ArrayList<String>();
// build the locations in a predictable order to simplify testing
- for(BlockLocation block: locations) {
+ for(BlockLocation block: navigableMap.values()) {
for(String host: block.getHosts()) {
if (sizes.containsKey(host)) {
if (sizes.get(host).get() >= threshold) {
Modified: hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java (original)
+++ hive/trunk/shims/0.20/src/main/java/org/apache/hadoop/hive/shims/Hadoop20Shims.java Fri Sep 19 05:56:56 2014
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import javax.security.auth.Subject;
import javax.security.auth.login.LoginException;
@@ -652,6 +653,17 @@ public class Hadoop20Shims implements Ha
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.sync();
}
Modified: hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java (original)
+++ hive/trunk/shims/0.20S/src/main/java/org/apache/hadoop/hive/shims/Hadoop20SShims.java Fri Sep 19 05:56:56 2014
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -403,6 +404,17 @@ public class Hadoop20SShims extends Hado
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.sync();
}
Modified: hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java (original)
+++ hive/trunk/shims/0.23/src/main/java/org/apache/hadoop/hive/shims/Hadoop23Shims.java Fri Sep 19 05:56:56 2014
@@ -29,6 +29,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -511,6 +512,17 @@ public class Hadoop23Shims extends Hadoo
}
@Override
+ public TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException {
+ TreeMap<Long, BlockLocation> offsetBlockMap = new TreeMap<Long, BlockLocation>();
+ BlockLocation[] locations = getLocations(fs, status);
+ for (BlockLocation location : locations) {
+ offsetBlockMap.put(location.getOffset(), location);
+ }
+ return offsetBlockMap;
+ }
+
+ @Override
public void hflush(FSDataOutputStream stream) throws IOException {
stream.hflush();
}
Modified: hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java?rev=1626128&r1=1626127&r2=1626128&view=diff
==============================================================================
--- hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java (original)
+++ hive/trunk/shims/common/src/main/java/org/apache/hadoop/hive/shims/HadoopShims.java Fri Sep 19 05:56:56 2014
@@ -30,6 +30,7 @@ import java.security.PrivilegedException
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import javax.security.auth.login.LoginException;
@@ -477,6 +478,19 @@ public interface HadoopShims {
FileStatus status) throws IOException;
/**
+ * For the block locations returned by getLocations() convert them into a Treemap
+ * <Offset,blockLocation> by iterating over the list of blockLocation.
+ * Using TreeMap from offset to blockLocation, makes it O(logn) to get a particular
+ * block based upon offset.
+ * @param fs the file system
+ * @param status the file information
+ * @return TreeMap<Long, BlockLocation>
+ * @throws IOException
+ */
+ TreeMap<Long, BlockLocation> getLocationsWithOffset(FileSystem fs,
+ FileStatus status) throws IOException;
+
+ /**
* Flush and make visible to other users the changes to the given stream.
* @param stream the stream to hflush.
* @throws IOException