You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/06 14:47:30 UTC

carbondata git commit: [CARBONDATA-1781] Fix EOFException in StreamBlockletReader

Repository: carbondata
Updated Branches:
  refs/heads/master e2a2d9931 -> 59eff88b0


[CARBONDATA-1781] Fix EOFException in StreamBlockletReader

In StreamBlockletReader.readBytesFromStream method, if the length of the reading is not enough, continue to read more data.

This closes #1621


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/59eff88b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/59eff88b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/59eff88b

Branch: refs/heads/master
Commit: 59eff88b02817d711e40bffb2b065ba0c261d395
Parents: e2a2d99
Author: QiangCai <qi...@qq.com>
Authored: Wed Dec 6 16:32:56 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 6 22:47:15 2017 +0800

----------------------------------------------------------------------
 .../hadoop/streaming/StreamBlockletReader.java  | 45 ++++++++++++--------
 1 file changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/59eff88b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
index eafb142..1989198 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/StreamBlockletReader.java
@@ -48,11 +48,11 @@ public class StreamBlockletReader {
 
   StreamBlockletReader(byte[] syncMarker, InputStream in, long limit, boolean isHeaderPresent) {
     this.syncMarker = syncMarker;
-    this.syncLen = syncMarker.length;
-    this.syncBuffer = new byte[syncMarker.length];
+    syncLen = syncMarker.length;
+    syncBuffer = new byte[syncLen];
     this.in = in;
-    this.limitStart = limit;
-    this.limitEnd = limitStart + syncLen;
+    limitStart = limit;
+    limitEnd = limitStart + syncLen;
     this.isHeaderPresent = isHeaderPresent;
   }
 
@@ -66,11 +66,9 @@ public class StreamBlockletReader {
    * find the first position of sync_marker in input stream
    */
   private boolean sync() throws IOException {
-    int len = in.read(syncBuffer);
-    if (len < syncLen) {
+    if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
       return false;
     }
-    pos += syncLen;
     boolean skipHeader = false;
     for (int i = 0; i < limitStart; i++) {
       int j = 0;
@@ -101,7 +99,9 @@ public class StreamBlockletReader {
   BlockletHeader readBlockletHeader() throws IOException {
     int len = readIntFromStream();
     byte[] b = new byte[len];
-    readBytesFromStream(b);
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet header");
+    }
     BlockletHeader header = CarbonUtil.readBlockletHeader(b);
     rowNums = header.getBlocklet_info().getNum_rows();
     rowIndex = 0;
@@ -113,7 +113,9 @@ public class StreamBlockletReader {
     offset = 0;
     int len = readIntFromStream();
     byte[] b = new byte[len];
-    readBytesFromStream(b);
+    if (!readBytesFromStream(b, 0, len)) {
+      throw new EOFException("Failed to read blocklet data");
+    }
     compressor.rawUncompress(b, buffer);
   }
 
@@ -143,11 +145,9 @@ public class StreamBlockletReader {
       return false;
     }
     if (isAlreadySync) {
-      int v = in.read(syncBuffer);
-      if (v < syncLen) {
+      if (!readBytesFromStream(syncBuffer, 0, syncLen)) {
         return false;
       }
-      pos += syncLen;
     } else {
       isAlreadySync = true;
       if (!sync()) {
@@ -176,12 +176,23 @@ public class StreamBlockletReader {
     return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
   }
 
-  void readBytesFromStream(byte[] b) throws IOException {
-    int len = in.read(b, 0, b.length);
-    if (len < b.length) {
-      throw new EOFException();
+  /**
+   * Reads <code>len</code> bytes of data from the input stream into
+   * an array of bytes.
+   * @return <code>true</code> if reading data successfully, or
+   * <code>false</code> if there is no more data because the end of the stream has been reached.
+   */
+  boolean readBytesFromStream(byte[] b, int offset, int len) throws IOException {
+    int readLen = in.read(b, offset, len);
+    if (readLen < 0) {
+      return false;
+    }
+    pos += readLen;
+    if (readLen < len) {
+      return readBytesFromStream(b, offset + readLen, len - readLen);
+    } else {
+      return true;
     }
-    pos += b.length;
   }
 
   boolean readBoolean() {