You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crail.apache.org by ps...@apache.org on 2018/06/26 07:55:35 UTC
[1/2] incubator-crail git commit: Delegate implementation of
CrailHDFS to CrailHadoopFileSystem
Repository: incubator-crail
Updated Branches:
refs/heads/master 76c3fe83c -> 90703f297
Delegate implementation of CrailHDFS to CrailHadoopFileSystem
Implement AbstractFileSystem by delegating function calls to
CrailHadoopFileSystm. The same can be done also by using
DelegateToFileSystem from hadoop-common, but this results in additional
operations (e.g., exists(path) and checkPath(path)).
https://issues.apache.org/jira/projects/CRAIL/issues/CRAIL-27
Signed-off-by: Patrick Stuedi <ps...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/b31bfd78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/b31bfd78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/b31bfd78
Branch: refs/heads/master
Commit: b31bfd786786e82e6df8e7c36e7083ea66ae7d23
Parents: 76c3fe8
Author: Patrick Stuedi <ps...@apache.org>
Authored: Mon Jun 25 13:16:01 2018 +0200
Committer: Patrick Stuedi <ps...@apache.org>
Committed: Mon Jun 25 13:16:01 2018 +0200
----------------------------------------------------------------------
.../java/org/apache/crail/hdfs/CrailHDFS.java | 191 ++-----------------
.../crail/hdfs/CrailHadoopFileSystem.java | 24 +--
2 files changed, 24 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b31bfd78/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
index 05b1b28..16568ee 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFS.java
@@ -22,24 +22,8 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
import java.util.EnumSet;
-import java.util.Iterator;
-
import org.slf4j.Logger;
-import org.apache.crail.CrailBlockLocation;
-import org.apache.crail.CrailBufferedInputStream;
-import org.apache.crail.CrailBufferedOutputStream;
-import org.apache.crail.CrailDirectory;
-import org.apache.crail.CrailStore;
-import org.apache.crail.CrailFile;
-import org.apache.crail.CrailLocationClass;
-import org.apache.crail.CrailNode;
-import org.apache.crail.CrailNodeType;
-import org.apache.crail.CrailStorageClass;
-import org.apache.crail.conf.CrailConfiguration;
-import org.apache.crail.conf.CrailConstants;
-import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.utils.CrailUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
@@ -59,26 +43,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
public class CrailHDFS extends AbstractFileSystem {
private static final Logger LOG = CrailUtils.getLogger();
- private CrailStore dfs;
- private Path workingDir;
+ private CrailHadoopFileSystem dfs;
public CrailHDFS(final URI uri, final Configuration conf) throws IOException, URISyntaxException {
super(uri, "crail", true, 9000);
-
- try {
- CrailConfiguration crailConf = new CrailConfiguration();
- this.dfs = CrailStore.newInstance(crailConf);
- Path _workingDir = new Path("/user/" + CrailConstants.USER);
- this.workingDir = new Path("/user/" + CrailConstants.USER).makeQualified(uri, _workingDir);
- LOG.info("CrailHDFS initialization done..");
- } catch(Exception e){
- throw new IOException(e);
- }
+ this.dfs = new CrailHadoopFileSystem();
+ dfs.initialize(uri, conf);
}
@Override
@@ -88,125 +62,42 @@ public class CrailHDFS extends AbstractFileSystem {
@Override
public FsServerDefaults getServerDefaults() throws IOException {
- return new FsServerDefaults(CrailConstants.BLOCK_SIZE, 512, 64*1024, (short) 1, 4096, false, (long) 0, DataChecksum.Type.CRC32);
+ return dfs.getServerDefaults(dfs.getWorkingDirectory());
}
@Override
public FSDataOutputStream createInternal(Path path, EnumSet<CreateFlag> flag, FsPermission absolutePermission, int bufferSize, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnsupportedFileSystemException, UnresolvedLinkException, IOException {
- CrailFile fileInfo = null;
- try {
- fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
- } catch(Exception e){
- if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
- fileInfo = null;
- } else {
- throw new IOException(e);
- }
- }
-
- if (fileInfo == null){
- Path parent = path.getParent();
- this.mkdir(parent, FsPermission.getDirDefault(), true);
- try {
- fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
- } catch(Exception e){
- throw new IOException(e);
- }
- }
-
- CrailBufferedOutputStream outputStream = null;
- if (fileInfo != null){
- try {
- fileInfo.syncDir();
- outputStream = fileInfo.getBufferedOutputStream(Integer.MAX_VALUE);
- } catch (Exception e) {
- throw new IOException(e);
- }
- } else {
- throw new IOException("Failed to create file, path " + path.toString());
- }
-
- if (outputStream != null){
- return new CrailHDFSOutputStream(outputStream, statistics);
- } else {
- throw new IOException("Failed to create file, path " + path.toString());
- }
+ return dfs.create(path, absolutePermission, false, bufferSize, replication, blockSize, progress);
}
@Override
public void mkdir(Path path, FsPermission permission, boolean createParent) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, UnresolvedLinkException, IOException {
- try {
- CrailDirectory file = dfs.create(path.toUri().getRawPath(), CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true).get().asDirectory();
- file.syncDir();
- } catch(Exception e){
- if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_PARENT_MISSING])){
- Path parent = path.getParent();
- mkdir(parent, permission, createParent);
- mkdir(path, permission, createParent);
- } else if (e.getMessage().contains(RpcErrors.messages[RpcErrors.ERR_FILE_EXISTS])){
- } else {
- throw new IOException(e);
- }
- }
+ dfs.mkdirs(path, permission);
}
@Override
public boolean delete(Path path, boolean recursive) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- try {
- CrailNode file = dfs.delete(path.toUri().getRawPath(), recursive).get();
- if (file != null){
- file.syncDir();
- }
- return file != null;
- } catch(Exception e){
- throw new IOException(e);
- }
+ return dfs.delete(path, recursive);
}
@Override
public FSDataInputStream open(Path path, int bufferSize) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- CrailFile fileInfo = null;
- try {
- fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
- } catch(Exception e){
- throw new IOException(e);
- }
-
- CrailBufferedInputStream inputStream = null;
- if (fileInfo != null){
- try {
- inputStream = fileInfo.getBufferedInputStream(fileInfo.getCapacity());
- } catch(Exception e){
- throw new IOException(e);
- }
- }
-
- if (inputStream != null){
- return new CrailHDFSInputStream(inputStream);
- } else {
- throw new IOException("Failed to open file, path " + path.toString());
- }
+ return dfs.open(path, bufferSize);
}
@Override
public boolean setReplication(Path f, short replication) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- return true;
+ return dfs.setReplication(f, replication);
}
@Override
public void renameInternal(Path src, Path dst) throws AccessControlException, FileAlreadyExistsException, FileNotFoundException, ParentNotDirectoryException, UnresolvedLinkException, IOException {
- try {
- CrailNode file = dfs.rename(src.toUri().getRawPath(), dst.toUri().getRawPath()).get();
- if (file != null){
- file.syncDir();
- }
- } catch(Exception e){
- throw new IOException(e);
- }
+ dfs.rename(src, dst);
}
@Override
public void setPermission(Path f, FsPermission permission) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
+ dfs.setPermission(f, permission);
}
@Override
@@ -224,76 +115,26 @@ public class CrailHDFS extends AbstractFileSystem {
@Override
public FileStatus getFileStatus(Path path) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- CrailNode directFile = null;
- try {
- directFile = dfs.lookup(path.toUri().getRawPath()).get();
- } catch(Exception e){
- throw new IOException(e);
- }
- if (directFile == null){
- throw new FileNotFoundException("filename " + path);
- }
-
- FsPermission permission = FsPermission.getFileDefault();
- if (directFile.getType().isDirectory()) {
- permission = FsPermission.getDirDefault();
- }
- FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, path.makeQualified(this.getUri(), this.workingDir));
- return status;
+ return dfs.getFileStatus(path);
}
@Override
public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- try {
- CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
- BlockLocation[] locations = new BlockLocation[_locations.length];
- for (int i = 0; i < locations.length; i++){
- locations[i] = new BlockLocation();
- locations[i].setOffset(_locations[i].getOffset());
- locations[i].setLength(_locations[i].getLength());
- locations[i].setNames(_locations[i].getNames());
- locations[i].setHosts(_locations[i].getHosts());
- locations[i].setTopologyPaths(_locations[i].getTopology());
-
- }
- return locations;
- } catch(Exception e){
- throw new IOException(e);
- }
+ return dfs.getFileBlockLocations(path, start, len);
}
@Override
public FsStatus getFsStatus() throws AccessControlException, FileNotFoundException, IOException {
- return new FsStatus(1000000000, 1000, 1000000000 - 1000);
+ return dfs.getStatus();
}
@Override
public FileStatus[] listStatus(Path path) throws AccessControlException, FileNotFoundException, UnresolvedLinkException, IOException {
- try {
- CrailNode node = dfs.lookup(path.toUri().getRawPath()).get();
- Iterator<String> iter = node.asContainer().listEntries();
- ArrayList<FileStatus> statusList = new ArrayList<FileStatus>();
- while(iter.hasNext()){
- String filepath = iter.next();
- CrailNode directFile = dfs.lookup(filepath).get();
- if (directFile != null){
- FsPermission permission = FsPermission.getFileDefault();
- if (directFile.getType().isDirectory()) {
- permission = FsPermission.getDirDefault();
- }
- FileStatus status = new FileStatus(directFile.getCapacity(), directFile.getType().isContainer(), CrailConstants.SHADOW_REPLICATION, CrailConstants.BLOCK_SIZE, directFile.getModificationTime(), directFile.getModificationTime(), permission, CrailConstants.USER, CrailConstants.USER, new Path(filepath).makeQualified(this.getUri(), workingDir));
- statusList.add(status);
- }
- }
- FileStatus[] list = new FileStatus[statusList.size()];
- statusList.toArray(list);
- return list;
- } catch(Exception e){
- throw new FileNotFoundException(path.toUri().getRawPath());
- }
+ return dfs.listStatus(path);
}
@Override
public void setVerifyChecksum(boolean verifyChecksum) throws AccessControlException, IOException {
+ dfs.setVerifyChecksum(verifyChecksum);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/b31bfd78/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
index 257d613..84c33c4 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
@@ -87,6 +87,7 @@ public class CrailHadoopFileSystem extends FileSystem {
}
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+ statistics.incrementReadOps(1);
CrailFile fileInfo = null;
try {
fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
@@ -101,6 +102,7 @@ public class CrailHadoopFileSystem extends FileSystem {
public FSDataOutputStream create(Path path, FsPermission permission,
boolean overwrite, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
CrailFile fileInfo = null;
try {
fileInfo = dfs.create(path.toUri().getRawPath(), CrailNodeType.DATAFILE, CrailStorageClass.PARENT, CrailLocationClass.PARENT, true).get().asFile();
@@ -147,6 +149,7 @@ public class CrailHadoopFileSystem extends FileSystem {
@Override
public boolean rename(Path src, Path dst) throws IOException {
try {
+ statistics.incrementWriteOps(1);
CrailNode file = dfs.rename(src.toUri().getRawPath(), dst.toUri().getRawPath()).get();
if (file != null){
file.syncDir();
@@ -160,6 +163,7 @@ public class CrailHadoopFileSystem extends FileSystem {
@Override
public boolean delete(Path path, boolean recursive) throws IOException {
try {
+ statistics.incrementWriteOps(1);
CrailNode file = dfs.delete(path.toUri().getRawPath(), recursive).get();
if (file != null){
file.syncDir();
@@ -209,6 +213,7 @@ public class CrailHadoopFileSystem extends FileSystem {
@Override
public boolean mkdirs(Path path, FsPermission permission) throws IOException {
try {
+ statistics.incrementWriteOps(1);
CrailDirectory file = dfs.create(path.toUri().getRawPath(), CrailNodeType.DIRECTORY, CrailStorageClass.PARENT, CrailLocationClass.DEFAULT, true).get().asDirectory();
file.syncDir();
return true;
@@ -227,6 +232,7 @@ public class CrailHadoopFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path path) throws IOException {
+ statistics.incrementReadOps(1);
CrailNode directFile = null;
try {
directFile = dfs.lookup(path.toUri().getRawPath()).get();
@@ -246,27 +252,13 @@ public class CrailHadoopFileSystem extends FileSystem {
@Override
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
- try {
- CrailBlockLocation[] _locations = dfs.lookup(file.getPath().toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
- BlockLocation[] locations = new BlockLocation[_locations.length];
- for (int i = 0; i < locations.length; i++){
- locations[i] = new BlockLocation();
- locations[i].setOffset(_locations[i].getOffset());
- locations[i].setLength(_locations[i].getLength());
- locations[i].setNames(_locations[i].getNames());
- locations[i].setHosts(_locations[i].getHosts());
- locations[i].setTopologyPaths(_locations[i].getTopology());
-
- }
- return locations;
- } catch(Exception e){
- throw new IOException(e);
- }
+ return this.getFileBlockLocations(file.getPath(), start, len);
}
@Override
public BlockLocation[] getFileBlockLocations(Path path, long start, long len) throws IOException {
try {
+ statistics.incrementReadOps(1);
CrailBlockLocation[] _locations = dfs.lookup(path.toUri().getRawPath()).get().asFile().getBlockLocations(start, len);
BlockLocation[] locations = new BlockLocation[_locations.length];
for (int i = 0; i < locations.length; i++){
[2/2] incubator-crail git commit: Make sure statistics are updated
across all classes in the Crail HDFS module
Posted by ps...@apache.org.
Make sure statistics are updated across all classes in the Crail HDFS
module
Previously statistics where only captured in HDFSOutputStream and parts
of the CrailHadoopFileSystem.
https://issues.apache.org/jira/browse/CRAIL-26
Signed-off-by: Patrick Stuedi <ps...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-crail/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crail/commit/90703f29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crail/tree/90703f29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crail/diff/90703f29
Branch: refs/heads/master
Commit: 90703f297ce4c5bc3eb7820101861b480291c27f
Parents: b31bfd7
Author: Patrick Stuedi <ps...@apache.org>
Authored: Mon Jun 25 13:46:45 2018 +0200
Committer: Patrick Stuedi <ps...@apache.org>
Committed: Mon Jun 25 13:46:45 2018 +0200
----------------------------------------------------------------------
.../apache/crail/hdfs/CrailHDFSInputStream.java | 54 ++++++++++++++++----
.../crail/hdfs/CrailHadoopFileSystem.java | 2 +-
2 files changed, 44 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/90703f29/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
index d90ce81..877e0f0 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHDFSInputStream.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.fs.FileSystem.Statistics;
import org.slf4j.Logger;
@@ -36,9 +37,10 @@ public class CrailHDFSInputStream extends FSDataInputStream {
private static final Logger LOG = CrailUtils.getLogger();
private CrailBufferedInputStream inputStream;
+ private Statistics stats;
- public CrailHDFSInputStream(CrailBufferedInputStream stream) {
- super(new CrailSeekable(stream));
+ public CrailHDFSInputStream(CrailBufferedInputStream stream, Statistics stats) {
+ super(new CrailSeekable(stream, stats));
LOG.info("new HDFS stream");
this.inputStream = stream;
}
@@ -50,13 +52,17 @@ public class CrailHDFSInputStream extends FSDataInputStream {
@Override
public int read(ByteBuffer buf) throws IOException {
- return inputStream.read(buf);
+ int res = inputStream.read(buf);
+ updateStats(res);
+ return res;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
- return inputStream.read(position, buffer, offset, length);
+ int res = inputStream.read(position, buffer, offset, length);
+ updateStats(res);
+ return res;
}
@Override
@@ -84,7 +90,9 @@ public class CrailHDFSInputStream extends FSDataInputStream {
@Override
public int read() throws IOException {
- return inputStream.read();
+ int res = inputStream.read();
+ updateStats(Integer.BYTES);
+ return res;
}
@Override
@@ -92,26 +100,40 @@ public class CrailHDFSInputStream extends FSDataInputStream {
return inputStream.available();
}
+ private void updateStats(long len) {
+ if (stats != null && len > 0) {
+ stats.incrementBytesRead(len);
+ }
+ }
+
public static class CrailSeekable extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
private CrailBufferedInputStream inputStream;
+ private Statistics stats;
- public CrailSeekable(CrailBufferedInputStream inputStream) {
+ public CrailSeekable(CrailBufferedInputStream inputStream, Statistics stats) {
this.inputStream = inputStream;
+ this.stats = stats;
}
@Override
public int read() throws IOException {
- return inputStream.read();
+ int value = inputStream.read();
+ updateStats(Integer.BYTES);
+ return value;
}
@Override
public int read(byte[] b) throws IOException {
- return inputStream.read(b);
+ int res = inputStream.read(b);
+ updateStats(Integer.BYTES);
+ return res;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
- return inputStream.read(b, off, len);
+ int res = inputStream.read(b, off, len);
+ updateStats(Integer.BYTES);
+ return res;
}
@Override
@@ -146,13 +168,17 @@ public class CrailHDFSInputStream extends FSDataInputStream {
@Override
public int read(ByteBuffer dataBuf) throws IOException {
- return inputStream.read(dataBuf);
+ int res = inputStream.read(dataBuf);
+ updateStats(Integer.BYTES);
+ return res;
}
@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
- return inputStream.read(position, buffer, offset, length);
+ int res = inputStream.read(position, buffer, offset, length);
+ updateStats(Integer.BYTES);
+ return res;
}
@Override
@@ -186,6 +212,12 @@ public class CrailHDFSInputStream extends FSDataInputStream {
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
return false;
+ }
+
+ private void updateStats(long len) {
+ if (stats != null && len > 0) {
+ stats.incrementBytesRead(len);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-crail/blob/90703f29/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
index 84c33c4..404d845 100644
--- a/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
+++ b/hdfs/src/main/java/org/apache/crail/hdfs/CrailHadoopFileSystem.java
@@ -92,7 +92,7 @@ public class CrailHadoopFileSystem extends FileSystem {
try {
fileInfo = dfs.lookup(path.toUri().getRawPath()).get().asFile();
CrailBufferedInputStream inputStream = fileInfo.getBufferedInputStream(fileInfo.getCapacity());
- return new CrailHDFSInputStream(inputStream);
+ return new CrailHDFSInputStream(inputStream, statistics);
} catch (Exception e) {
throw new IOException(e);
}