You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/06 14:47:30 UTC
carbondata git commit: [CARBONDATA-1781] Fix EOFException in
StreamBlockletReader
Repository: carbondata
Updated Branches:
refs/heads/master e2a2d9931 -> 59eff88b0
[CARBONDATA-1781] Fix EOFException in StreamBlockletReader
In StreamBlockletReader.readBytesFromStream method, if the length of the reading is not enough, continue to read more data.
This closes #1621
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59eff88b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59eff88b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59eff88b
Branch: refs/heads/master
Commit: 59eff88b02817d711e40bffb2b065ba0c261d395
Parents: e2a2d99
Author: QiangCai <qi...@qq.com>
Authored: Wed Dec 6 16:32:56 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 22:47:15 2017 +0800
----------------------------------------------------------------------
.../hadoop/streaming/StreamBlockletReader.java | 45 ++++++++++++--------
1 file changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/59eff88b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
index eafb142..1989198 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
@@ -48,11 +48,11 @@ public class StreamBlockletReader {
StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
this.syncMarker = syncMarker;
- this.syncLen = syncMarker.length;
- this.syncBuffer = new byte[syncMarker.length];
+ syncLen = syncMarker.length;
+ syncBuffer = new byte[syncLen];
this.in = in;
- this.limitStart = limit;
- this.limitEnd = limitStart + syncLen;
+ limitStart = limit;
+ limitEnd = limitStart + syncLen;
this.isHeaderPresent = isHeaderPresent;
}
@@ -66,11 +66,9 @@ public class StreamBlockletReader {
* find the first position of sync_marker in input stream
*/
private boolean sync() throws IOException {
- int len = in.read(syncBuffer);
- if (len < syncLen) {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
return false;
}
- pos += syncLen;
boolean skipHeader = false;
for (int i = 0; i < limitStart; i++) {
int j = 0;
@@ -101,7 +99,9 @@ public class StreamBlockletReader {
BlockletHeader readBlockletHeader() throws IOException {
int len = readIntFromStream();
byte[] b = new byte[len];
- readBytesFromStream(b);
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet header");
+ }
BlockletHeader header = CarbonUtil.readBlockletHeader(b);
rowNums = header.getBlocklet_info().getNum_rows();
rowIndex = 0;
@@ -113,7 +113,9 @@ public class StreamBlockletReader {
offset = 0;
int len = readIntFromStream();
byte[] b = new byte[len];
- readBytesFromStream(b);
+ if (!readBytesFromStream(b, 0, len)) {
+ throw new EOFException("Failed to read blocklet data");
+ }
compressor.rawUncompress(b, buffer);
}
@@ -143,11 +145,9 @@ public class StreamBlockletReader {
return false;
}
if (isAlreadySync) {
- int v = in.read(syncBuffer);
- if (v < syncLen) {
+ if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
return false;
}
- pos += syncLen;
} else {
isAlreadySync = true;
if (!sync()) {
@@ -176,12 +176,23 @@ public class StreamBlockletReader {
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
- void readBytesFromStream(byte[] b) throws IOException {
- int len = in.read(b, 0, b.length);
- if (len < b.length) {
- throw new EOFException();
+ /**
+ * Reads <code>len</code> bytes of data from the input stream into
+ * an array of bytes.
+ * @return <code>true</code> if reading data successfully, or
+ * <code>false</code> if there is no more data because the end of the stream has been reached.
+ */
+ boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+ int readLen = in.read(b, offset, len);
+ if (readLen < 0) {
+ return false;
+ }
+ pos += readLen;
+ if (readLen < len) {
+ return readBytesFromStream(b, offset + readLen, len - readLen);
+ } else {
+ return true;
}
- pos += b.length;
}
boolean readBoolean() {