You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/10/03 10:39:55 UTC
carbondata git commit: [HOTFIX] Fixed S3 metrics issue.
Repository: carbondata
Updated Branches:
refs/heads/master 2081bc87a -> 7d1fcb309
[HOTFIX] Fixed S3 metrics issue.
Problem: When data read from s3 it shows the data read as more than the size of carbon data total size.
Reason: It happens because carbondata uses dataInputStream.skip but in s3 interface it cannot handle properly
it reads in a loop and reads more data than required.
Solution: Use FSDataInputStream.seek instead of skip to fix this issue.
This closes #2789
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7d1fcb30
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7d1fcb30
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7d1fcb30
Branch: refs/heads/master
Commit: 7d1fcb3092a1e9da6c49f17c63c6217892e9e531
Parents: 2081bc8
Author: ravipesala <ra...@gmail.com>
Authored: Fri Sep 28 18:29:08 2018 +0530
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Oct 3 16:08:49 2018 +0530
----------------------------------------------------------------------
.../datastore/filesystem/AbstractDFSCarbonFile.java | 7 +++++--
.../apache/carbondata/core/reader/ThriftReader.java | 16 ++++++----------
2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index b1e476b..c764430 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -327,8 +327,11 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
CompressionCodec codec = new CompressionCodecFactory(hadoopConf).getCodecByName(codecName);
inputStream = codec.createInputStream(inputStream);
}
-
- return new DataInputStream(new BufferedInputStream(inputStream));
+ if (bufferSize <= 0 && inputStream instanceof FSDataInputStream) {
+ return (DataInputStream) inputStream;
+ } else {
+ return new DataInputStream(new BufferedInputStream(inputStream));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/7d1fcb30/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
index 48d8345..f5ecda6 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/ThriftReader.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
@@ -36,10 +37,6 @@ import org.apache.thrift.transport.TIOStreamTransport;
*/
public class ThriftReader {
/**
- * buffer size
- */
- private static final int bufferSize = 2048;
- /**
* File containing the objects.
*/
private String fileName;
@@ -101,7 +98,7 @@ public class ThriftReader {
public void open() throws IOException {
Configuration conf = configuration != null ? configuration : FileFactory.getConfiguration();
FileFactory.FileType fileType = FileFactory.getFileType(fileName);
- dataInputStream = FileFactory.getDataInputStream(fileName, fileType, bufferSize, conf);
+ dataInputStream = FileFactory.getDataInputStream(fileName, fileType, conf);
binaryIn = new TCompactProtocol(new TIOStreamTransport(dataInputStream));
}
@@ -109,7 +106,9 @@ public class ThriftReader {
* This method will set the position of stream from where data has to be read
*/
public void setReadOffset(long bytesToSkip) throws IOException {
- if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
+ if (dataInputStream instanceof FSDataInputStream) {
+ ((FSDataInputStream)dataInputStream).seek(bytesToSkip);
+ } else if (dataInputStream.skip(bytesToSkip) != bytesToSkip) {
throw new IOException("It doesn't set the offset properly");
}
}
@@ -118,10 +117,7 @@ public class ThriftReader {
* Checks if another objects is available by attempting to read another byte from the stream.
*/
public boolean hasNext() throws IOException {
- dataInputStream.mark(1);
- int val = dataInputStream.read();
- dataInputStream.reset();
- return val != -1;
+ return dataInputStream.available() > 0;
}
/**