You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2016/09/14 10:53:15 UTC
[10/35] ignite git commit: IGNITE-3890: IGFS: Simplified
IgfsInputStream hierarchy.
IGNITE-3890: IGFS: Simplified IgfsInputStream hierarchy.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/16c5a715
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/16c5a715
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/16c5a715
Branch: refs/heads/ignite-3199-1
Commit: 16c5a715889322d31ed95a2a29206d3a909aa7b7
Parents: 43f65fe
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Sep 13 18:00:31 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Sep 13 18:00:31 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 7 +--
.../ignite/internal/processors/igfs/IgfsEx.java | 10 ----
.../internal/processors/igfs/IgfsImpl.java | 11 +++--
.../processors/igfs/IgfsInputStreamAdapter.java | 51 --------------------
.../processors/igfs/IgfsInputStreamImpl.java | 17 +++++--
.../processors/igfs/IgfsIpcHandler.java | 7 +--
.../igfs/IgfsSecondaryFileSystemImpl.java | 2 +-
.../processors/igfs/IgfsMetricsSelfTest.java | 21 +++-----
.../internal/processors/igfs/IgfsMock.java | 8 +--
.../hadoop/igfs/HadoopIgfsInProc.java | 12 ++---
.../hadoop/HadoopCommandLineTest.java | 4 +-
11 files changed, 47 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 07b070e..743601e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -25,6 +25,7 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream;
@@ -125,18 +126,18 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize,
+ @Override public IgfsInputStream open(IgfsPath path, int bufSize,
int seqReadsBeforePrefetch) {
return igfs.open(path, bufSize, seqReadsBeforePrefetch);
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path) {
+ @Override public IgfsInputStream open(IgfsPath path) {
return igfs.open(path);
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) {
+ @Override public IgfsInputStream open(IgfsPath path, int bufSize) {
return igfs.open(path, bufSize);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index 9760f43..05e157d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -49,16 +49,6 @@ public interface IgfsEx extends IgniteFileSystem {
*/
public IgfsPaths proxyPaths();
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch)
- throws IgniteException;
-
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException;
-
- /** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException;
-
/**
* Gets global space counters.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 2720f24..2c1f0f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -36,6 +36,7 @@ import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode;
@@ -948,24 +949,24 @@ public final class IgfsImpl implements IgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path) {
+ @Override public IgfsInputStream open(IgfsPath path) {
return open(path, cfg.getStreamBufferSize(), cfg.getSequentialReadsBeforePrefetch());
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) {
+ @Override public IgfsInputStream open(IgfsPath path, int bufSize) {
return open(path, bufSize, cfg.getSequentialReadsBeforePrefetch());
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(final IgfsPath path, final int bufSize,
+ @Override public IgfsInputStream open(final IgfsPath path, final int bufSize,
final int seqReadsBeforePrefetch) {
A.notNull(path, "path");
A.ensure(bufSize >= 0, "bufSize >= 0");
A.ensure(seqReadsBeforePrefetch >= 0, "seqReadsBeforePrefetch >= 0");
- return safeOp(new Callable<IgfsInputStreamAdapter>() {
- @Override public IgfsInputStreamAdapter call() throws Exception {
+ return safeOp(new Callable<IgfsInputStream>() {
+ @Override public IgfsInputStream call() throws Exception {
if (log.isDebugEnabled())
log.debug("Open file for reading [path=" + path + ", bufSize=" + bufSize + ']');
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java
deleted file mode 100644
index 07ab051..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamAdapter.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.igfs.IgfsInputStream;
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-
-import java.io.IOException;
-
-/**
- * Implementation adapter providing necessary methods.
- */
-public abstract class IgfsInputStreamAdapter extends IgfsInputStream
- implements IgfsSecondaryFileSystemPositionedReadable {
- /** {@inheritDoc} */
- @Override public long length() {
- return fileInfo().length();
- }
-
- /**
- * Gets file info for opened file.
- *
- * @return File info.
- */
- public abstract IgfsEntryInfo fileInfo();
-
- /**
- * Reads bytes from given position.
- *
- * @param pos Position to read from.
- * @param len Number of bytes to read.
- * @return Array of chunks with respect to chunk file representation.
- * @throws IOException If read failed.
- */
- public abstract byte[][] readChunks(long pos, int len) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index ca2f9f7..f20a423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -46,7 +46,7 @@ import java.util.concurrent.locks.ReentrantLock;
/**
* Input stream to read data from grid cache with separate blocks.
*/
-public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
+public class IgfsInputStreamImpl extends IgfsInputStream implements IgfsSecondaryFileSystemPositionedReadable {
/** Empty chunks result. */
private static final byte[][] EMPTY_CHUNKS = new byte[0][];
@@ -158,8 +158,8 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
}
/** {@inheritDoc} */
- @Override public IgfsEntryInfo fileInfo() {
- return fileInfo;
+ @Override public long length() {
+ return fileInfo.length();
}
/** {@inheritDoc} */
@@ -234,9 +234,16 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
return readFromStore(pos, buf, off, len);
}
- /** {@inheritDoc} */
+ /**
+ * Reads bytes from given position.
+ *
+ * @param pos Position to read from.
+ * @param len Number of bytes to read.
+ * @return Array of chunks with respect to chunk file representation.
+ * @throws IOException If read failed.
+ */
@SuppressWarnings("IfMayBeConditional")
- @Override public synchronized byte[][] readChunks(long pos, int len) throws IOException {
+ public synchronized byte[][] readChunks(long pos, int len) throws IOException {
// Readable bytes in the file, starting from the specified position.
long readable = fileInfo.length() - pos;
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index a888aff..6047604 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsOutputStream;
@@ -381,7 +382,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
break;
case OPEN_READ: {
- IgfsInputStreamAdapter igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
+ IgfsInputStream igfsIn = !req.flag() ? igfs.open(req.path(), bufSize) :
igfs.open(req.path(), bufSize, req.sequentialReadsBeforePrefetch());
long streamId = registerResource(ses, igfsIn);
@@ -390,7 +391,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
log.debug("Opened IGFS input stream for file read [igfsName=" + igfs.name() + ", path=" +
req.path() + ", streamId=" + streamId + ", ses=" + ses + ']');
- res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.fileInfo().length()));
+ res.response(new IgfsInputStreamDescriptor(streamId, igfsIn.length()));
break;
}
@@ -514,7 +515,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
long pos = req.position();
int size = req.length();
- IgfsInputStreamAdapter igfsIn = (IgfsInputStreamAdapter)resource(ses, rsrcId);
+ IgfsInputStreamImpl igfsIn = (IgfsInputStreamImpl)resource(ses, rsrcId);
if (igfsIn == null)
throw new IgniteCheckedException("Input stream not found (already closed?): " + rsrcId);
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 453682c..526e60d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -86,7 +86,7 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystemV2 {
/** {@inheritDoc} */
@Override public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize)
throws IgniteException {
- return igfs.open(path, bufSize);
+ return (IgfsSecondaryFileSystemPositionedReadable)igfs.open(path, bufSize);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
index 38cfc00..8d742fb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetricsSelfTest.java
@@ -116,6 +116,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
* @return Configuration.
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private IgniteConfiguration primaryConfiguration(int idx) throws Exception {
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
@@ -172,6 +173,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
*
* @throws Exception If failed.
*/
+ @SuppressWarnings("unchecked")
private void startSecondary() throws Exception {
FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
@@ -384,6 +386,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
*
* @throws Exception If failed.
*/
+ @SuppressWarnings({"ResultOfMethodCallIgnored", "ConstantConditions"})
public void testBlockMetrics() throws Exception {
IgfsEx igfs = (IgfsEx)igfsPrimary[0];
@@ -424,7 +427,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
checkBlockMetrics(initMetrics, igfs.metrics(), 0, 0, 0, 3, 0, blockSize * 3);
// Read data from the first file.
- IgfsInputStreamAdapter is = igfs.open(file1);
+ IgfsInputStream is = igfs.open(file1);
is.readFully(0, new byte[blockSize * 2]);
is.close();
@@ -432,7 +435,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
// Read data from the second file with hits.
is = igfs.open(file2);
- is.readChunks(0, blockSize);
+ is.read(new byte[blockSize]);
is.close();
checkBlockMetrics(initMetrics, igfs.metrics(), 3, 0, blockSize * 3, 3, 0, blockSize * 3);
@@ -449,7 +452,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
// Read remote file.
is = igfs.open(fileRemote);
- is.readChunks(0, rmtBlockSize);
+ is.read(new byte[rmtBlockSize]);
is.close();
checkBlockMetrics(initMetrics, igfs.metrics(), 4, 1, blockSize * 3 + rmtBlockSize, 3, 0, blockSize * 3);
@@ -459,7 +462,7 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
// Read remote file again.
is = igfs.open(fileRemote);
- is.readChunks(0, rmtBlockSize);
+ is.read(new byte[rmtBlockSize]);
is.close();
checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 3, 0, blockSize * 3);
@@ -495,16 +498,6 @@ public class IgfsMetricsSelfTest extends IgfsCommonAbstractTest {
checkBlockMetrics(initMetrics, igfs.metrics(), 5, 1, blockSize * 3 + rmtBlockSize * 2, 5, 1,
blockSize * 7 / 2 + rmtBlockSize);
- // Now read partial block.
- // Read remote file again.
- is = igfs.open(file1);
- is.seek(blockSize * 2);
- is.readChunks(0, blockSize / 2);
- is.close();
-
- checkBlockMetrics(initMetrics, igfs.metrics(), 6, 1, blockSize * 7 / 2 + rmtBlockSize * 2, 5, 1,
- blockSize * 7 / 2 + rmtBlockSize);
-
igfs.resetMetrics();
metrics = igfs.metrics();
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 0138907..2b989c5 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -23,6 +23,7 @@ import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsMetrics;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.IgfsOutputStream;
@@ -75,21 +76,22 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException {
+ @Override public IgfsInputStream open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch)
+ throws IgniteException {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException {
+ @Override public IgfsInputStream open(IgfsPath path) throws IgniteException {
throwUnsupported();
return null;
}
/** {@inheritDoc} */
- @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException {
+ @Override public IgfsInputStream open(IgfsPath path, int bufSize) throws IgniteException {
throwUnsupported();
return null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
index f426243..3220538 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsInProc.java
@@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsPathSummary;
@@ -35,7 +36,6 @@ import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.processors.igfs.IgfsHandshakeResponse;
-import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter;
import org.apache.ignite.internal.processors.igfs.IgfsStatus;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -316,9 +316,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
try {
return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
@Override public HadoopIgfsStreamDelegate apply() {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+ IgfsInputStream stream = igfs.open(path, bufSize);
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
}
});
}
@@ -336,9 +336,9 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
try {
return IgfsUserContext.doAs(user, new IgniteOutClosure<HadoopIgfsStreamDelegate>() {
@Override public HadoopIgfsStreamDelegate apply() {
- IgfsInputStreamAdapter stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
+ IgfsInputStream stream = igfs.open(path, bufSize, seqReadsBeforePrefetch);
- return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.fileInfo().length());
+ return new HadoopIgfsStreamDelegate(HadoopIgfsInProc.this, stream, stream.length());
}
});
}
@@ -394,7 +394,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
/** {@inheritDoc} */
@Override public IgniteInternalFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, long pos, int len,
@Nullable byte[] outBuf, int outOff, int outLen) {
- IgfsInputStreamAdapter stream = delegate.target();
+ IgfsInputStream stream = delegate.target();
try {
byte[] res = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/16c5a715/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
index 8dc2717..7ee318a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopCommandLineTest.java
@@ -34,11 +34,11 @@ import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.hadoop.fs.IgniteHadoopFileSystemCounterWriter;
+import org.apache.ignite.igfs.IgfsInputStream;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.processors.igfs.IgfsInputStreamAdapter;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -398,7 +398,7 @@ public class HadoopCommandLineTest extends GridCommonAbstractTest {
"location '/result' as " + qry
));
- IgfsInputStreamAdapter in = igfs.open(new IgfsPath("/result/000000_0"));
+ IgfsInputStream in = igfs.open(new IgfsPath("/result/000000_0"));
byte[] buf = new byte[(int) in.length()];