You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by ps...@apache.org on 2018/06/26 07:55:35 UTC

[1/2] incubator-crail git commit: Delegate implementation of CrailHDFS to CrailHadoopFileSystem

Repository: incubator-crail
Updated Branches:
  refs/heads/master 76c3fe83c -> 90703f297


Delegate implementation of CrailHDFS to CrailHadoopFileSystem

Implement AbstractFileSystem by delegating function calls to
CrailHadoopFileSystm. The same can be done also by using
DelegateToFileSystem from hadoop-common, but this results in additional
operations (e.g., exists(path) and checkPath(path)).

https://issues.apache.org/jira/projects/CRAIL/issues/CRAIL-27

Signed-off-by: Patrick Stuedi <ps...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/b31bfd78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/b31bfd78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/b31bfd78

Branch: refs/heads/master
Commit: b31bfd786786e82e6df8e7c36e7083ea66ae7d23
Parents: 76c3fe8
Author: Patrick Stuedi <ps...@apache.org>
Authored: Mon Jun 25 13:16:01 2018 +0200
Committer: Patrick Stuedi <ps...@apache.org>
Committed: Mon Jun 25 13:16:01 2018 +0200

----------------------------------------------------------------------
 .../java/org/apache/crail/hdfs/CrailHDFS.java   | 191 ++-----------------
 .../crail/hdfs/CrailHadoopFileSystem.java       |  24 +--
 2 files changed, 24 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b31bfd78/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
index 05b1b28..16568ee 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
@@ -22,24 +22,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.EnumSet;
-import java.util.Iterator;
-
 import org.slf4j.Logger;
-import org.apache.crail.CrailBlockLocation;
-import org.apache.crail.CrailBufferedInputStream;
-import org.apache.crail.CrailBufferedOutputStream;
-import org.apache.crail.CrailDirectory;
-import org.apache.crail.CrailStore;
-import org.apache.crail.CrailFile;
-import org.apache.crail.CrailLocationClass;
-import org.apache.crail.CrailNode;
-import org.apache.crail.CrailNodeType;
-import org.apache.crail.CrailStorageClass;
-import org.apache.crail.conf.CrailConfiguration;
-import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.rpc.RpcErrors;
 import org.apache.crail.utils.CrailUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
@@ -59,26 +43,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 
 public class CrailHDFS extends AbstractFileSystem {
 	private static final Logger LOG = CrailUtils.getLogger();
-	private CrailStore dfs;
-	private Path workingDir;
+	private CrailHadoopFileSystem dfs;
 	
 	public CrailHDFS(final URI uri, final Configuration conf) throws IOException, URISyntaxException {
 		super(uri, "crail", true, 9000);
-		
-		try {
-			CrailConfiguration crailConf = new CrailConfiguration();
-			this.dfs = CrailStore.newInstance(crailConf);
-			Path _workingDir = new Path("/user/" + CrailConstants.USER);
-			this.workingDir = new Path("/user/" + CrailConstants.USER).makeQualified(uri, _workingDir);
-			LOG.info("CrailHDFS initialization done..");
-		} catch(Exception e){
-			throw new IOException(e);
-		}
+		this.dfs = new CrailHadoopFileSystem();
+		dfs.initialize(uri, conf);
 	}
 
 	@Override
@@ -88,125 +62,42 @@ public class CrailHDFS extends AbstractFileSystem {
 
 	@Override
 	public FsServerDefaults getServerDefaults() throws IOException {
-		return new FsServerDefaults(CrailConstants.BLOCK_SIZE, 512, 64*1024, (short) 1, 4096, false, (long) 0, DataChecksum.Type.CRC32);
+		return dfs.getServerDefaults(dfs.getWorkingDirectory());
 	}
 
 	@Override
 	public FSDataOutputStream createInternal(Path path, EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException {
-		CrailFile fileInfo = null;
-		try {
-			fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
-		} catch(Exception e){
-			if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
-				fileInfo = null;
-			} else {
-				throw new IOException(e);
-			}
-		}
-		
-		if (fileInfo == null){
-			Path parent = path.getParent();
-			this.mkdir(parent, FsPermission.getDirDefault(), true);
-			try {
-				fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
-			} catch(Exception e){
-				throw new IOException(e);
-			}
-		}
-		
-		CrailBufferedOutputStream outputStream = null;
-		if (fileInfo != null){
-			try {
-				fileInfo.syncDir();
-				outputStream = fileInfo.getBufferedOutputStream(Integer.MAX_VALUE);
-			} catch (Exception e) {
-				throw new IOException(e);
-			}			
-		} else {
-			throw new IOException("Failed to create file, path " + path.toString());
-		}
-		
-		if (outputStream != null){
-			return new CrailHDFSOutputStream(outputStream, statistics);					
-		} else {
-			throw new IOException("Failed to create file, path " + path.toString());
-		}		
+		return dfs.create(path, absolutePermission, false, bufferSize, replication, blockSize, progress);
 	}
 
 	@Override
 	public void mkdir(Path path, FsPermission permission, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, UnresolvedLinkException, IOException {
-		try {
-			CrailDirectory file = dfs.create(path.toUri().getRawPath(), CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true).get().asDirectory();
-			file.syncDir();
-		} catch(Exception e){
-			if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
-				Path parent = path.getParent();
-				mkdir(parent, permission, createParent);
-				mkdir(path, permission, createParent);
-			} else if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_FILE_EXISTS])){
-			} else {
-				throw new IOException(e);
-			}			
-		}
+		dfs.mkdirs(path, permission);
 	}
 
 	@Override
 	public boolean delete(Path path, boolean recursive) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		try {
-			CrailNode file = dfs.delete(path.toUri().getRawPath(), recursive).get();
-			if (file != null){
-				file.syncDir();
-			}
-			return file != null;
-		} catch(Exception e){
-			throw new IOException(e);
-		}
+		return dfs.delete(path, recursive);
 	}
 
 	@Override
 	public FSDataInputStream open(Path path, int bufferSize) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		CrailFile fileInfo = null;
-		try {
-			fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
-		} catch(Exception e){
-			throw new IOException(e);
-		}
-		
-		CrailBufferedInputStream inputStream = null;
-		if (fileInfo != null){
-			try {
-				inputStream = fileInfo.getBufferedInputStream(fileInfo.getCapacity());
-			} catch(Exception e){
-				throw new IOException(e);
-			}
-		}
-		
-		if (inputStream != null){
-			return new CrailHDFSInputStream(inputStream);
-		} else {
-			throw new IOException("Failed to open file, path " + path.toString());
-		}
+		return dfs.open(path, bufferSize);
 	}
 
 	@Override
 	public boolean setReplication(Path f, short replication) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		return true;
+		return dfs.setReplication(f, replication);
 	}
 
 	@Override
 	public void renameInternal(Path src, Path dst) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnresolvedLinkException, IOException {
-		try {
-			CrailNode file = dfs.rename(src.toUri().getRawPath(), dst.toUri().getRawPath()).get();
-			if (file != null){
-				file.syncDir();
-			}
-		} catch(Exception e){
-			throw new IOException(e);
-		}
+		dfs.rename(src, dst);
 	}
 
 	@Override
 	public void setPermission(Path f, FsPermission permission) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
+		dfs.setPermission(f, permission);
 	}
 
 	@Override
@@ -224,76 +115,26 @@ public class CrailHDFS extends AbstractFileSystem {
 
 	@Override
 	public FileStatus getFileStatus(Path path) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		CrailNode directFile = null;
-		try {
-			directFile = dfs.lookup(path.toUri().getRawPath()).get();
-		} catch(Exception e){
-			throw new IOException(e);
-		}
-		if (directFile == null){
-			throw new FileNotFoundException("filename " + path);
-		}
-		
-		FsPermission permission = FsPermission.getFileDefault();
-		if (directFile.getType().isDirectory()) {
-			permission = FsPermission.getDirDefault();
-		}		
-		FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, path.makeQualified(this.getUri(), this.workingDir));
-		return status;
+		return dfs.getFileStatus(path);
 	}
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		try {
-			CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
-			BlockLocation[] locations = new BlockLocation[_locations.length];
-			for (int i = 0; i < locations.length; i++){
-				locations[i] = new BlockLocation();
-				locations[i].setOffset(_locations[i].getOffset());
-				locations[i].setLength(_locations[i].getLength());
-				locations[i].setNames(_locations[i].getNames());
-				locations[i].setHosts(_locations[i].getHosts());
-				locations[i].setTopologyPaths(_locations[i].getTopology());
-				
-			}
-			return locations;
-		} catch(Exception e){
-			throw new IOException(e);
-		}
+		return dfs.getFileBlockLocations(path, start, len);
 	}
 
 	@Override
 	public FsStatus getFsStatus() throws AccessControlException, FileNotFoundException, IOException {
-		return new FsStatus(1000000000, 1000, 1000000000 - 1000);
+		return dfs.getStatus();
 	}
 
 	@Override
 	public FileStatus[] listStatus(Path path) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
-		try {
-			CrailNode node = dfs.lookup(path.toUri().getRawPath()).get();
-			Iterator<String> iter = node.asContainer().listEntries();
-			ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
-			while(iter.hasNext()){
-				String filepath = iter.next();
-				CrailNode directFile = dfs.lookup(filepath).get();
-				if (directFile != null){
-					FsPermission permission = FsPermission.getFileDefault();
-					if (directFile.getType().isDirectory()) {
-						permission = FsPermission.getDirDefault();
-					}
-					FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, new Path(filepath).makeQualified(this.getUri(), workingDir));	
-					statusList.add(status);
-				}
-			}
-			FileStatus[] list = new FileStatus[statusList.size()];
-			statusList.toArray(list);
-			return list;
-		} catch(Exception e){
-			throw new FileNotFoundException(path.toUri().getRawPath());
-		}
+		return dfs.listStatus(path);
 	}
 
 	@Override
 	public void setVerifyChecksum(boolean verifyChecksum) throws AccessControlException, IOException {
+		dfs.setVerifyChecksum(verifyChecksum);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b31bfd78/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
index 257d613..84c33c4 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
@@ -87,6 +87,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 	}	
 
 	public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+		statistics.incrementReadOps(1);
 		CrailFile fileInfo = null;
 		try {
 			fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
@@ -101,6 +102,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 	public FSDataOutputStream create(Path path, FsPermission permission,
 			boolean overwrite, int bufferSize, short replication,
 			long blockSize, Progressable progress) throws IOException {
+		statistics.incrementWriteOps(1);
 		CrailFile fileInfo = null;
 		try {
 			fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
@@ -147,6 +149,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 	@Override
 	public boolean rename(Path src, Path dst) throws IOException {
 		try {
+			statistics.incrementWriteOps(1);
 			CrailNode file = dfs.rename(src.toUri().getRawPath(), dst.toUri().getRawPath()).get();
 			if (file != null){
 				file.syncDir();
@@ -160,6 +163,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 	@Override
 	public boolean delete(Path path, boolean recursive) throws IOException {
 		try {
+			statistics.incrementWriteOps(1);
 			CrailNode file = dfs.delete(path.toUri().getRawPath(), recursive).get();
 			if (file != null){
 				file.syncDir();
@@ -209,6 +213,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 	@Override
 	public boolean mkdirs(Path path, FsPermission permission) throws IOException {
 		try {
+			statistics.incrementWriteOps(1);
 			CrailDirectory file = dfs.create(path.toUri().getRawPath(), CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true).get().asDirectory();
 			file.syncDir();
 			return true;
@@ -227,6 +232,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 
 	@Override
 	public FileStatus getFileStatus(Path path) throws IOException {
+		statistics.incrementReadOps(1);
 		CrailNode directFile = null;
 		try {
 			directFile = dfs.lookup(path.toUri().getRawPath()).get();
@@ -246,27 +252,13 @@ public class CrailHadoopFileSystem extends FileSystem {
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
-		try {
-			CrailBlockLocation[] _locations = dfs.lookup(file.getPath().toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
-			BlockLocation[] locations = new BlockLocation[_locations.length];
-			for (int i = 0; i < locations.length; i++){
-				locations[i] = new BlockLocation();
-				locations[i].setOffset(_locations[i].getOffset());
-				locations[i].setLength(_locations[i].getLength());
-				locations[i].setNames(_locations[i].getNames());
-				locations[i].setHosts(_locations[i].getHosts());
-				locations[i].setTopologyPaths(_locations[i].getTopology());
-				
-			}			
-			return locations;
-		} catch(Exception e){
-			throw new IOException(e);
-		}
+		return this.getFileBlockLocations(file.getPath(), start, len);
 	}
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
 		try {
+			statistics.incrementReadOps(1);
 			CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
 			BlockLocation[] locations = new BlockLocation[_locations.length];
 			for (int i = 0; i < locations.length; i++){


[2/2] incubator-crail git commit: Make sure statistics are updated across all classes in the Crail HDFS module

Posted by ps...@apache.org.
Make sure statistics are updated across all classes in the Crail HDFS
module

Previously statistics where only captured in HDFSOutputStream and parts
of the CrailHadoopFileSystem.

https://issues.apache.org/jira/browse/CRAIL-26

Signed-off-by: Patrick Stuedi <ps...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/90703f29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/90703f29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/90703f29

Branch: refs/heads/master
Commit: 90703f297ce4c5bc3eb7820101861b480291c27f
Parents: b31bfd7
Author: Patrick Stuedi <ps...@apache.org>
Authored: Mon Jun 25 13:46:45 2018 +0200
Committer: Patrick Stuedi <ps...@apache.org>
Committed: Mon Jun 25 13:46:45 2018 +0200

----------------------------------------------------------------------
 .../apache/crail/hdfs/CrailHDFSInputStream.java | 54 ++++++++++++++++----
 .../crail/hdfs/CrailHadoopFileSystem.java       |  2 +-
 2 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/90703f29/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
index d90ce81..877e0f0 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.slf4j.Logger;
 
 
@@ -36,9 +37,10 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 	private static final Logger LOG = CrailUtils.getLogger();
 	
 	private CrailBufferedInputStream inputStream;
+	private Statistics stats;
 	
-	public CrailHDFSInputStream(CrailBufferedInputStream stream) {
-		super(new CrailSeekable(stream));
+	public CrailHDFSInputStream(CrailBufferedInputStream stream, Statistics stats) {
+		super(new CrailSeekable(stream, stats));
 		LOG.info("new HDFS stream");
 		this.inputStream = stream;
 	}
@@ -50,13 +52,17 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 
 	@Override
 	public int read(ByteBuffer buf) throws IOException {
-		return inputStream.read(buf);
+		int res = inputStream.read(buf);
+		updateStats(res);
+		return res;
 	}
 
 	@Override
 	public int read(long position, byte[] buffer, int offset, int length)
 			throws IOException {
-		return inputStream.read(position, buffer, offset, length);
+		int res = inputStream.read(position, buffer, offset, length);
+		updateStats(res);
+		return res;
 	}
 	
 	@Override
@@ -84,7 +90,9 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 
 	@Override
 	public int read() throws IOException {
-		return inputStream.read();
+		int res = inputStream.read();
+		updateStats(Integer.BYTES);
+		return res;
 	}
 
 	@Override
@@ -92,26 +100,40 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 		return inputStream.available();
 	}
 	
+	private void updateStats(long len) {
+		if (stats != null && len > 0) {
+			stats.incrementBytesRead(len);
+		}
+	}
+	
 	public static class CrailSeekable extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
 		private CrailBufferedInputStream inputStream;
+		private Statistics stats;
 		
-		public CrailSeekable(CrailBufferedInputStream inputStream) {
+		public CrailSeekable(CrailBufferedInputStream inputStream, Statistics stats) {
 			this.inputStream = inputStream;
+			this.stats = stats;
 		}
 
 		@Override
 		public int read() throws IOException {
-			return inputStream.read();
+			int value = inputStream.read();
+			updateStats(Integer.BYTES);
+			return value;
 		}
 
 		@Override
 		public int read(byte[] b) throws IOException {
-			return inputStream.read(b);
+			int res = inputStream.read(b);
+			updateStats(Integer.BYTES);
+			return res;
 		}
 
 		@Override
 		public int read(byte[] b, int off, int len) throws IOException {
-			return inputStream.read(b, off, len);
+			int res = inputStream.read(b, off, len);
+			updateStats(Integer.BYTES);
+			return res;
 		}
 
 		@Override
@@ -146,13 +168,17 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 
 		@Override
 		public int read(ByteBuffer dataBuf) throws IOException {
-			return inputStream.read(dataBuf);
+			int res = inputStream.read(dataBuf);
+			updateStats(Integer.BYTES);
+			return res;
 		}
 
 		@Override
 		public int read(long position, byte[] buffer, int offset, int length)
 				throws IOException {
-			return inputStream.read(position, buffer, offset, length);
+			int res = inputStream.read(position, buffer, offset, length);
+			updateStats(Integer.BYTES);
+			return res;
 		}
 
 		@Override
@@ -186,6 +212,12 @@ public class CrailHDFSInputStream extends FSDataInputStream {
 		@Override
 		public boolean seekToNewSource(long targetPos) throws IOException {
 			return false;
+		}
+		
+		private void updateStats(long len) {
+			if (stats != null && len > 0) {
+				stats.incrementBytesRead(len);
+			}
 		}		
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/90703f29/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
index 84c33c4..404d845 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
@@ -92,7 +92,7 @@ public class CrailHadoopFileSystem extends FileSystem {
 		try {
 			fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
 			CrailBufferedInputStream inputStream = fileInfo.getBufferedInputStream(fileInfo.getCapacity());
-			return new CrailHDFSInputStream(inputStream);
+			return new CrailHDFSInputStream(inputStream, statistics);
 		} catch (Exception e) {
 			throw new IOException(e);
 		}