You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/10/09 15:50:24 UTC

[22/45] carbondata git commit: [HOTFIX] Fixed S3 metrics issue.

[HOTFIX] Fixed S3 metrics issue.

Problem: When data read from s3 it shows the data read as more than the size of carbon data total size.
Reason: It happens because carbondata uses dataInputStream.skip but in s3 interface it cannot handle properly
it reads in a loop and reads more data than required.
Solution: Use FSDataInputStream.seek instead of skip to fix this issue.

This closes #2789


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7d1fcb30
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7d1fcb30
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7d1fcb30

Branch: refs/heads/branch-1.5
Commit: 7d1fcb3092a1e9da6c49f17c63c6217892e9e531
Parents: 2081bc8
Author: ravipesala <ra...@gmail.com>
Authored: Fri Sep 28 18:29:08 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Oct 3 16:08:49 2018 +0530

----------------------------------------------------------------------
 .../datastore/filesystem/AbstractDFSCarbonFile.java |  7 +++++--
 .../apache/carbondata/core/reader/ThriftReader.java | 16 ++++++----------
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index b1e476b..c764430 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -327,8 +327,11 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
       CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
       inputStream = codec.createInputStream(inputStream);
     }
-
-    return new DataInputStream(new BufferedInputStream(inputStream));
+    if (bufferSize <= 0 && inputStream instanceof FSDataInputStream) {
+      return (DataInputStream) inputStream;
+    } else {
+      return new DataInputStream(new BufferedInputStream(inputStream));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 48d8345..f5ecda6 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -36,10 +37,6 @@ import org.apache.thrift.transport.TIOStreamTransport;
  */
 public class ThriftReader {
   /**
-   * buffer size
-   */
-  private static final int bufferSize = 2048;
-  /**
    * File containing the objects.
    */
   private String fileName;
@@ -101,7 +98,7 @@ public class ThriftReader {
   public void open() throws IOException {
     Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
     FileFactory.FileType fileType = FileFactory.getFileType(fileName);
-    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf);
+    dataInputStream = FileFactory.getDataInputStream(fileName, fileType, conf);
     binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
   }
 
@@ -109,7 +106,9 @@ public class ThriftReader {
    * This method will set the position of stream from where data has to be read
    */
   public void setReadOffset(long bytesToSkip) throws IOException {
-    if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
+    if (dataInputStream instanceof FSDataInputStream) {
+      ((FSDataInputStream)dataInputStream).seek(bytesToSkip);
+    } else if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
       throw new IOException("It doesn't set the offset properly");
     }
   }
@@ -118,10 +117,7 @@ public class ThriftReader {
    * Checks if another objects is available by attempting to read another byte from the stream.
    */
   public boolean hasNext() throws IOException {
-    dataInputStream.mark(1);
-    int val = dataInputStream.read();
-    dataInputStream.reset();
-    return val != -1;
+    return dataInputStream.available() > 0;
   }
 
   /**