You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/03/09 20:14:46 UTC
[1/3] hive git commit: HIVE-13216 : ORC Reader will leave file open
until GC when opening a malformed ORC file (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/branch-1 5c8b6c0a1 -> e1d9eb55e
refs/heads/master 61b66449b -> 5bf324ea5
HIVE-13216 : ORC Reader will leave file open until GC when opening a malformed ORC file (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3931d4d6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3931d4d6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3931d4d6
Branch: refs/heads/master
Commit: 3931d4d67fe2ca930f0ca6ed2d9bd6ff37ff9087
Parents: 61b6644
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Mar 9 10:51:58 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Mar 9 10:51:58 2016 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 108 ++++++++++---------
1 file changed, 57 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/3931d4d6/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index 1299c9c..773c2b1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -485,61 +485,67 @@ public class ReaderImpl implements Reader {
long maxFileLength
) throws IOException {
FSDataInputStream file = fs.open(path);
+ ByteBuffer buffer = null, fullFooterBuffer = null;
+ OrcProto.PostScript ps = null;
+ OrcFile.WriterVersion writerVersion = null;
+ try {
+ // figure out the size of the file using the option or filesystem
+ long size;
+ if (maxFileLength == Long.MAX_VALUE) {
+ size = fs.getFileStatus(path).getLen();
+ } else {
+ size = maxFileLength;
+ }
- // figure out the size of the file using the option or filesystem
- long size;
- if (maxFileLength == Long.MAX_VALUE) {
- size = fs.getFileStatus(path).getLen();
- } else {
- size = maxFileLength;
- }
-
- //read last bytes into buffer to get PostScript
- int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
- ByteBuffer buffer = ByteBuffer.allocate(readSize);
- assert buffer.position() == 0;
- file.readFully((size - readSize),
- buffer.array(), buffer.arrayOffset(), readSize);
- buffer.position(0);
-
- //read the PostScript
- //get length of PostScript
- int psLen = buffer.get(readSize - 1) & 0xff;
- ensureOrcFooter(file, path, psLen, buffer);
- int psOffset = readSize - 1 - psLen;
- OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset);
-
- int footerSize = (int) ps.getFooterLength();
- int metadataSize = (int) ps.getMetadataLength();
- OrcFile.WriterVersion writerVersion = extractWriterVersion(ps);
-
-
- //check if extra bytes need to be read
- ByteBuffer fullFooterBuffer = null;
- int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
- if (extra > 0) {
- //more bytes need to be read, seek back to the right place and read extra bytes
- ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
- file.readFully((size - readSize - extra), extraBuf.array(),
- extraBuf.arrayOffset() + extraBuf.position(), extra);
- extraBuf.position(extra);
- //append with already read bytes
- extraBuf.put(buffer);
- buffer = extraBuf;
+ //read last bytes into buffer to get PostScript
+ int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+ buffer = ByteBuffer.allocate(readSize);
+ assert buffer.position() == 0;
+ file.readFully((size - readSize),
+ buffer.array(), buffer.arrayOffset(), readSize);
buffer.position(0);
- fullFooterBuffer = buffer.slice();
- buffer.limit(footerSize + metadataSize);
- } else {
- //footer is already in the bytes in buffer, just adjust position, length
- buffer.position(psOffset - footerSize - metadataSize);
- fullFooterBuffer = buffer.slice();
- buffer.limit(psOffset);
- }
- // remember position for later
- buffer.mark();
+ //read the PostScript
+ //get length of PostScript
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ ensureOrcFooter(file, path, psLen, buffer);
+ int psOffset = readSize - 1 - psLen;
+ ps = extractPostScript(buffer, path, psLen, psOffset);
+
+ int footerSize = (int) ps.getFooterLength();
+ int metadataSize = (int) ps.getMetadataLength();
+ writerVersion = extractWriterVersion(ps);
+
+ //check if extra bytes need to be read
+ int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+ if (extra > 0) {
+ //more bytes need to be read, seek back to the right place and read extra bytes
+ ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+ file.readFully((size - readSize - extra), extraBuf.array(),
+ extraBuf.arrayOffset() + extraBuf.position(), extra);
+ extraBuf.position(extra);
+ //append with already read bytes
+ extraBuf.put(buffer);
+ buffer = extraBuf;
+ buffer.position(0);
+ fullFooterBuffer = buffer.slice();
+ buffer.limit(footerSize + metadataSize);
+ } else {
+ //footer is already in the bytes in buffer, just adjust position, length
+ buffer.position(psOffset - footerSize - metadataSize);
+ fullFooterBuffer = buffer.slice();
+ buffer.limit(psOffset);
+ }
- file.close();
+ // remember position for later TODO: what later? this comment is useless
+ buffer.mark();
+ } finally {
+ try {
+ file.close();
+ } catch (IOException ex) {
+ LOG.error("Failed to close the file after another error", ex);
+ }
+ }
return new FileMetaInfo(
ps.getCompression().toString(),
[2/3] hive git commit: HIVE-13216 : ORC Reader will leave file open
until GC when opening a malformed ORC file (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Posted by se...@apache.org.
HIVE-13216 : ORC Reader will leave file open until GC when opening a malformed ORC file (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e1d9eb55
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e1d9eb55
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e1d9eb55
Branch: refs/heads/branch-1
Commit: e1d9eb55ef403df86a343b4c67d32fdfdd71b6c5
Parents: 5c8b6c0
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Mar 9 10:57:29 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Mar 9 10:57:29 2016 -0800
----------------------------------------------------------------------
.../hadoop/hive/ql/io/orc/ReaderImpl.java | 141 ++++++++++---------
1 file changed, 74 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e1d9eb55/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
index a7c564a..d42675c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
@@ -352,79 +352,86 @@ public class ReaderImpl implements Reader {
long maxFileLength
) throws IOException {
FSDataInputStream file = fs.open(path);
+ ByteBuffer buffer = null;
+ OrcProto.PostScript ps = null;
+ OrcFile.WriterVersion writerVersion = null;
+ try {
+ // figure out the size of the file using the option or filesystem
+ long size;
+ if (maxFileLength == Long.MAX_VALUE) {
+ size = fs.getFileStatus(path).getLen();
+ } else {
+ size = maxFileLength;
+ }
- // figure out the size of the file using the option or filesystem
- long size;
- if (maxFileLength == Long.MAX_VALUE) {
- size = fs.getFileStatus(path).getLen();
- } else {
- size = maxFileLength;
- }
+ //read last bytes into buffer to get PostScript
+ int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
+ buffer = ByteBuffer.allocate(readSize);
+ file.readFully((size - readSize),
+ buffer.array(), buffer.arrayOffset(), readSize);
+
+ //read the PostScript
+ //get length of PostScript
+ int psLen = buffer.get(readSize - 1) & 0xff;
+ ensureOrcFooter(file, path, psLen, buffer);
+ int psOffset = readSize - 1 - psLen;
+ CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
+ buffer.arrayOffset() + psOffset, psLen);
+ ps = OrcProto.PostScript.parseFrom(in);
+
+ checkOrcVersion(LOG, path, ps.getVersionList());
+
+ int footerSize = (int) ps.getFooterLength();
+ int metadataSize = (int) ps.getMetadataLength();
+ if (ps.hasWriterVersion()) {
+ writerVersion = getWriterVersion(ps.getWriterVersion());
+ } else {
+ writerVersion = OrcFile.WriterVersion.ORIGINAL;
+ }
- //read last bytes into buffer to get PostScript
- int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
- ByteBuffer buffer = ByteBuffer.allocate(readSize);
- file.readFully((size - readSize),
- buffer.array(), buffer.arrayOffset(), readSize);
-
- //read the PostScript
- //get length of PostScript
- int psLen = buffer.get(readSize - 1) & 0xff;
- ensureOrcFooter(file, path, psLen, buffer);
- int psOffset = readSize - 1 - psLen;
- CodedInputStream in = CodedInputStream.newInstance(buffer.array(),
- buffer.arrayOffset() + psOffset, psLen);
- OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in);
-
- checkOrcVersion(LOG, path, ps.getVersionList());
-
- int footerSize = (int) ps.getFooterLength();
- int metadataSize = (int) ps.getMetadataLength();
- OrcFile.WriterVersion writerVersion;
- if (ps.hasWriterVersion()) {
- writerVersion = getWriterVersion(ps.getWriterVersion());
- } else {
- writerVersion = OrcFile.WriterVersion.ORIGINAL;
- }
+ //check compression codec
+ switch (ps.getCompression()) {
+ case NONE:
+ break;
+ case ZLIB:
+ break;
+ case SNAPPY:
+ break;
+ case LZO:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+ }
- //check compression codec
- switch (ps.getCompression()) {
- case NONE:
- break;
- case ZLIB:
- break;
- case SNAPPY:
- break;
- case LZO:
- break;
- default:
- throw new IllegalArgumentException("Unknown compression");
- }
+ //check if extra bytes need to be read
+ int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
+ if (extra > 0) {
+ //more bytes need to be read, seek back to the right place and read extra bytes
+ ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
+ file.readFully((size - readSize - extra), extraBuf.array(),
+ extraBuf.arrayOffset() + extraBuf.position(), extra);
+ extraBuf.position(extra);
+ //append with already read bytes
+ extraBuf.put(buffer);
+ buffer = extraBuf;
+ buffer.position(0);
+ buffer.limit(footerSize + metadataSize);
+ } else {
+ //footer is already in the bytes in buffer, just adjust position, length
+ buffer.position(psOffset - footerSize - metadataSize);
+ buffer.limit(psOffset);
+ }
- //check if extra bytes need to be read
- int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize);
- if (extra > 0) {
- //more bytes need to be read, seek back to the right place and read extra bytes
- ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize);
- file.readFully((size - readSize - extra), extraBuf.array(),
- extraBuf.arrayOffset() + extraBuf.position(), extra);
- extraBuf.position(extra);
- //append with already read bytes
- extraBuf.put(buffer);
- buffer = extraBuf;
- buffer.position(0);
- buffer.limit(footerSize + metadataSize);
- } else {
- //footer is already in the bytes in buffer, just adjust position, length
- buffer.position(psOffset - footerSize - metadataSize);
- buffer.limit(psOffset);
+ // remember position for later
+ buffer.mark();
+ } finally {
+ try {
+ file.close();
+ } catch (IOException ex) {
+ LOG.error("Failed to close the file after another error", ex);
+ }
}
- // remember position for later
- buffer.mark();
-
- file.close();
-
return new FileMetaInfo(
ps.getCompression().toString(),
(int) ps.getCompressionBlockSize(),
[3/3] hive git commit: HIVE-13211 : normalize Hive.get overloads to
go thru one path (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Posted by se...@apache.org.
HIVE-13211 : normalize Hive.get overloads to go thru one path (Sergey Shelukhin, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5bf324ea
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5bf324ea
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5bf324ea
Branch: refs/heads/master
Commit: 5bf324ea5eaf308233a2af6149e8fb01bee0e4c6
Parents: 3931d4d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Mar 9 11:03:52 2016 -0800
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Mar 9 11:03:52 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/metadata/Hive.java | 92 +++++++++++---------
.../hive/ql/parse/BaseSemanticAnalyzer.java | 2 +-
2 files changed, 52 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/5bf324ea/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index fdc7956..80208c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -248,7 +248,7 @@ public class Hive {
*
*/
public static Hive get(HiveConf c) throws HiveException {
- return getInternal(c, false);
+ return getInternal(c, false, false, true);
}
/**
@@ -256,24 +256,67 @@ public class Hive {
* MS client, assuming the relevant settings would be unchanged within the same conf object.
*/
public static Hive getWithFastCheck(HiveConf c) throws HiveException {
- return getInternal(c, true);
+ return getWithFastCheck(c, true);
}
- private static Hive getInternal(HiveConf c, boolean isFastCheck) throws HiveException {
+ /**
+ * Same as {@link #get(HiveConf)}, except that it checks only the object identity of existing
+ * MS client, assuming the relevant settings would be unchanged within the same conf object.
+ */
+ public static Hive getWithFastCheck(HiveConf c, boolean doRegisterAllFns) throws HiveException {
+ return getInternal(c, false, true, doRegisterAllFns);
+ }
+
+ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFastCheck,
+ boolean doRegisterAllFns) throws HiveException {
Hive db = hiveDB.get();
- if (db == null || !db.isCurrentUserOwner() ||
- (db.metaStoreClient != null && !isCompatible(db, c, isFastCheck))) {
- return get(c, true);
+ if (db == null || !db.isCurrentUserOwner() || needsRefresh
+ || (c != null && db.metaStoreClient != null && !isCompatible(db, c, isFastCheck))) {
+ return create(c, false, db, doRegisterAllFns);
+ }
+ if (c != null) {
+ db.conf = c;
}
- db.conf = c;
return db;
}
+ private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean doRegisterAllFns)
+ throws HiveException {
+ if (db != null) {
+ LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
+ ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
+ db.close();
+ }
+ closeCurrent();
+ if (c == null) {
+ c = createHiveConf();
+ }
+ c.set("fs.scheme.class", "dfs");
+ Hive newdb = new Hive(c, doRegisterAllFns);
+ hiveDB.set(newdb);
+ return newdb;
+ }
+
+
+ private static HiveConf createHiveConf() {
+ SessionState session = SessionState.get();
+ return (session == null) ? new HiveConf(Hive.class) : session.getConf();
+ }
+
private static boolean isCompatible(Hive db, HiveConf c, boolean isFastCheck) {
return isFastCheck
? db.metaStoreClient.isSameConfObj(c) : db.metaStoreClient.isCompatibleWith(c);
}
+
+ public static Hive get() throws HiveException {
+ return get(true);
+ }
+
+ public static Hive get(boolean doRegisterAllFns) throws HiveException {
+ return getInternal(null, false, false, doRegisterAllFns);
+ }
+
/**
* get a connection to metastore. see get(HiveConf) function for comments
*
@@ -285,40 +328,7 @@ public class Hive {
* @throws HiveException
*/
public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
- Hive db = hiveDB.get();
- if (db == null || needsRefresh || !db.isCurrentUserOwner()) {
- if (db != null) {
- LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
- ", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
- }
- closeCurrent();
- c.set("fs.scheme.class", "dfs");
- Hive newdb = new Hive(c, true);
- hiveDB.set(newdb);
- return newdb;
- }
- db.conf = c;
- return db;
- }
-
- public static Hive get() throws HiveException {
- return get(true);
- }
-
- public static Hive get(boolean doRegisterAllFns) throws HiveException {
- Hive db = hiveDB.get();
- if (db != null && !db.isCurrentUserOwner()) {
- LOG.debug("Creating new db. db.isCurrentUserOwner = " + db.isCurrentUserOwner());
- db.close();
- db = null;
- }
- if (db == null) {
- SessionState session = SessionState.get();
- HiveConf conf = session == null ? new HiveConf(Hive.class) : session.getConf();
- db = new Hive(conf, doRegisterAllFns);
- hiveDB.set(db);
- }
- return db;
+ return getInternal(c, needsRefresh, false, true);
}
public static void set(Hive hive) {
http://git-wip-us.apache.org/repos/asf/hive/blob/5bf324ea/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
index b36a9a0..f6ba521 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
@@ -200,7 +200,7 @@ public abstract class BaseSemanticAnalyzer {
}
public BaseSemanticAnalyzer(HiveConf conf) throws SemanticException {
- this(conf, createHiveDB(conf));
+ this(conf, createHiveDB(conf));
}
public BaseSemanticAnalyzer(HiveConf conf, Hive db) throws SemanticException {