You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/28 13:47:37 UTC
ignite git commit: IGNITE-3961: IGFS: Added
IgfsSecondaryFileSystem.affintiy() method. This closes #1114. This closes
#1252.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 a61b0eaff -> 2df39a80d
IGNITE-3961: IGFS: Added IgfsSecondaryFileSystem.affintiy() method. This closes #1114. This closes #1252.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2df39a80
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2df39a80
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2df39a80
Branch: refs/heads/ignite-2.0
Commit: 2df39a80d80e2575be61a902ccd48615796fcde9
Parents: a61b0ea
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Wed Dec 28 16:47:24 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Wed Dec 28 16:47:24 2016 +0300
----------------------------------------------------------------------
.../igfs/IgfsGroupDataBlocksKeyMapper.java | 17 +-
.../igfs/secondary/IgfsSecondaryFileSystem.java | 18 ++
.../local/LocalIgfsSecondaryFileSystem.java | 96 ++++++-
.../processors/igfs/IgfsBaseBlockKey.java | 42 +++
.../internal/processors/igfs/IgfsBlockKey.java | 26 +-
.../processors/igfs/IgfsBlockLocationImpl.java | 55 ++++
.../processors/igfs/IgfsDataManager.java | 12 +-
.../internal/processors/igfs/IgfsImpl.java | 12 +-
.../processors/igfs/IgfsKernalContextAware.java | 32 ---
.../igfs/IgfsSecondaryFileSystemImpl.java | 7 +
.../local/LocalFileSystemBlockKey.java | 103 +++++++
.../LocalFileSystemPositionedReadable.java | 65 +++++
...fsSecondaryFileSystemPositionedReadable.java | 65 -----
.../processors/resource/GridResourceIoc.java | 6 +-
.../resource/GridResourceProcessor.java | 31 ++-
.../ignite/resources/FileSystemResource.java | 62 +++++
.../processors/igfs/IgfsAbstractSelfTest.java | 2 +-
.../igfs/IgfsDualAbstractSelfTest.java | 14 +-
...fsLocalSecondaryFileSystemProxySelfTest.java | 81 ++++++
...gfsSecondaryFileSystemInjectionSelfTest.java | 270 +++++++++++++++++++
.../ignite/testsuites/IgniteIgfsTestSuite.java | 3 +
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 21 +-
...doopIgfsSecondaryFileSystemDelegateImpl.java | 61 ++++-
.../impl/igfs/Hadoop1OverIgfsProxyTest.java | 67 +++++
.../testsuites/IgniteHadoopTestSuite.java | 2 +
25 files changed, 1031 insertions(+), 139 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
index b35b692..09143d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsGroupDataBlocksKeyMapper.java
@@ -18,9 +18,10 @@
package org.apache.ignite.igfs;
import org.apache.ignite.internal.processors.cache.GridCacheDefaultAffinityKeyMapper;
-import org.apache.ignite.internal.processors.igfs.IgfsBlockKey;
+import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
/**
* {@code IGFS} class providing ability to group file's data blocks together on one node.
@@ -84,15 +85,17 @@ public class IgfsGroupDataBlocksKeyMapper extends GridCacheDefaultAffinityKeyMap
/** {@inheritDoc} */
@Override public Object affinityKey(Object key) {
- if (key != null && IgfsBlockKey.class.equals(key.getClass())) {
- IgfsBlockKey blockKey = (IgfsBlockKey)key;
+ if (key instanceof IgfsBaseBlockKey) {
+ IgfsBaseBlockKey blockKey = (IgfsBaseBlockKey)key;
- if (blockKey.affinityKey() != null)
- return blockKey.affinityKey();
+ IgniteUuid affKey = blockKey.affinityKey();
- long grpId = blockKey.getBlockId() / grpSize;
+ if (affKey != null)
+ return affKey;
- return blockKey.getFileId().hashCode() + (int)(grpId ^ (grpId >>> 32));
+ long grpId = blockKey.blockId() / grpSize;
+
+ return blockKey.fileHash() + (int)(grpId ^ (grpId >>> 32));
}
return super.affinityKey(key);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 37c9c7d..76ba454 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -21,8 +21,10 @@ import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.jetbrains.annotations.Nullable;
/**
@@ -202,4 +204,20 @@ public interface IgfsSecondaryFileSystem {
* @throws IgniteException If failed.
*/
public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException;
+
+ /**
+ * Get affinity block locations for data blocks of the file. In case {@code maxLen} parameter is set and
+ * particular block location length is greater than this value, block locations will be split into smaller
+ * chunks.
+ *
+ * @param path File path to get affinity for.
+ * @param start Position in the file to start affinity resolution from.
+ * @param len Size of data in the file to resolve affinity for.
+ * @param maxLen Maximum length of a single returned block location length.
+ * @return Affinity block locations.
+ * @throws IgniteException In case of error.
+ * @throws IgfsPathNotFoundException If path doesn't exist.
+ */
+ public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen)
+ throws IgniteException;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
index 18d31de..86f7387 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
@@ -17,10 +17,14 @@
package org.apache.ignite.igfs.secondary.local;
+import java.util.ArrayList;
import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.FileTime;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
@@ -29,14 +33,20 @@ import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsDataManager;
+import org.apache.ignite.internal.processors.igfs.IgfsImpl;
+import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemBlockKey;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemIgfsFile;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemSizeVisitor;
import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemUtils;
-import org.apache.ignite.internal.processors.igfs.secondary.local.LocalIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.secondary.local.LocalFileSystemPositionedReadable;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.FileSystemResource;
+import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.Nullable;
import java.io.File;
@@ -61,6 +71,16 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
/** Path that will be added to each passed path. */
private String workDir;
+ /** Logger. */
+ @SuppressWarnings("unused")
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** IGFS instance. */
+ @SuppressWarnings("unused")
+ @FileSystemResource
+ private IgfsImpl igfs;
+
/**
* Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
*
@@ -258,7 +278,7 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
try {
FileInputStream in = new FileInputStream(fileForPath(path));
- return new LocalIgfsSecondaryFileSystemPositionedReadable(in, bufSize);
+ return new LocalFileSystemPositionedReadable(in, bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to open file for read: " + path);
@@ -402,6 +422,78 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+ long maxLen) throws IgniteException {
+ File f = fileForPath(path);
+
+ if (!f.exists())
+ throw new IgfsPathNotFoundException("File not found: " + path);
+
+ // Create fake block & fake affinity for blocks
+ long blockSize = igfs.configuration().getBlockSize();
+
+ if (maxLen <= 0)
+ maxLen = Long.MAX_VALUE;
+
+ assert maxLen > 0 : "maxLen : " + maxLen;
+
+ long end = start + len;
+
+ Collection<IgfsBlockLocation> blocks = new ArrayList<>((int)(len / maxLen));
+
+ IgfsDataManager data = igfs.context().data();
+
+ Collection<ClusterNode> lastNodes = null;
+
+ long lastBlockIdx = -1;
+
+ IgfsBlockLocationImpl lastBlock = null;
+
+ for (long offset = start; offset < end; ) {
+ long blockIdx = offset / blockSize;
+
+ // Each step is min of maxLen and end of block.
+ long lenStep = Math.min(
+ maxLen - (lastBlock != null ? lastBlock.length() : 0),
+ (blockIdx + 1) * blockSize - offset);
+
+ lenStep = Math.min(lenStep, end - offset);
+
+ // Create fake affinity key to map blocks of secondary filesystem to nodes.
+ LocalFileSystemBlockKey affKey = new LocalFileSystemBlockKey(path, blockIdx);
+
+ if (blockIdx != lastBlockIdx) {
+ Collection<ClusterNode> nodes = data.affinityNodes(affKey);
+
+ if (!nodes.equals(lastNodes) && lastNodes != null && lastBlock != null) {
+ blocks.add(lastBlock);
+
+ lastBlock = null;
+ }
+
+ lastNodes = nodes;
+
+ lastBlockIdx = blockIdx;
+ }
+
+ if(lastBlock == null)
+ lastBlock = new IgfsBlockLocationImpl(offset, lenStep, lastNodes);
+ else
+ lastBlock.increaseLength(lenStep);
+
+ if (lastBlock.length() == maxLen || lastBlock.start() + lastBlock.length() == end) {
+ blocks.add(lastBlock);
+
+ lastBlock = null;
+ }
+
+ offset += lenStep;
+ }
+
+ return blocks;
+ }
+
/**
* Get work directory.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
new file mode 100644
index 0000000..05ef086
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBaseBlockKey.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The base class to block key that is used by the {@link IgfsGroupDataBlocksKeyMapper}
+ */
+public interface IgfsBaseBlockKey {
+ /**
+ * @return Block ID.
+ */
+ public long blockId();
+
+ /**
+ * @return Hash based on a file identifier (path, ID, etc).
+ */
+ public int fileHash();
+
+ /**
+ * @return Block affinity key (if any).
+ */
+ @Nullable public IgniteUuid affinityKey();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
index c366ae3..414f6b0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockKey.java
@@ -44,8 +44,8 @@ import org.jetbrains.annotations.Nullable;
/**
* File's binary data block key.
*/
-@GridInternal
-public final class IgfsBlockKey implements Message, Externalizable, Binarylizable, Comparable<IgfsBlockKey> {
+public final class IgfsBlockKey implements IgfsBaseBlockKey, Message, Externalizable, Binarylizable,
+ Comparable<IgfsBlockKey> {
/** */
private static final long serialVersionUID = 0L;
@@ -93,13 +93,21 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl
return fileId;
}
- /**
- * @return Block affinity key.
- */
- public IgniteUuid affinityKey() {
+ /** {@inheritDoc} */
+ @Override public IgniteUuid affinityKey() {
return affKey;
}
+ /** {@inheritDoc} */
+ @Override public long blockId() {
+ return blockId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fileHash() {
+ return fileId.hashCode();
+ }
+
/**
* @return Evict exclude flag.
*/
@@ -107,12 +115,6 @@ public final class IgfsBlockKey implements Message, Externalizable, Binarylizabl
return evictExclude;
}
- /**
- * @return Block ID.
- */
- public long getBlockId() {
- return blockId;
- }
/** {@inheritDoc} */
@Override public void onAckReceived() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
index 2d4a0af..3f5d9fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsBlockLocationImpl.java
@@ -24,6 +24,7 @@ import java.io.ObjectOutput;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
@@ -39,6 +40,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
/**
* File block location in the grid.
@@ -61,6 +63,7 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
private Collection<String> names;
/** */
+ @GridToStringInclude
private Collection<String> hosts;
/**
@@ -102,6 +105,44 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
}
/**
+ * @param start Start.
+ * @param len Length.
+ * @param block Block.
+ */
+ public IgfsBlockLocationImpl(long start, long len, IgfsBlockLocation block) {
+ assert start >= 0;
+ assert len > 0;
+
+ this.start = start;
+ this.len = len;
+
+ nodeIds = block.nodeIds();
+ names = block.names();
+ hosts = block.hosts();
+ }
+
+ /**
+ * @param start Start.
+ * @param len Length.
+ * @param names Collection of host:port addresses.
+ * @param hosts Collection of host:port addresses.
+ */
+ public IgfsBlockLocationImpl(long start, long len, Collection<String> names, Collection<String> hosts) {
+ assert start >= 0;
+ assert len > 0;
+ assert names != null && !names.isEmpty();
+ assert hosts != null && !hosts.isEmpty();
+
+ this.start = start;
+ this.len = len;
+
+ nodeIds = Collections.emptySet();
+
+ this.names = names;
+ this.hosts = hosts;
+ }
+
+ /**
* @return Start position.
*/
@Override public long start() {
@@ -116,6 +157,20 @@ public class IgfsBlockLocationImpl implements IgfsBlockLocation, Externalizable,
}
/**
+ * @param addLen Length to increase.
+ */
+ public void increaseLength(long addLen) {
+ len += addLen;
+ }
+
+ /**
+ * @param len Block length.
+ */
+ public void length(long len) {
+ this.len = len;
+ }
+
+ /**
* @return Node IDs.
*/
@Override public Collection<UUID> nodeIds() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 4490a68..d6297b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -278,6 +278,16 @@ public class IgfsDataManager extends IgfsManager {
}
/**
+ * Maps affinity key to node.
+ *
+ * @param affinityKey Affinity key to map.
+ * @return Primary node for this key.
+ */
+ public Collection<ClusterNode> affinityNodes(Object affinityKey) {
+ return dataCache.affinity().mapKeyToPrimaryAndBackups(affinityKey);
+ }
+
+ /**
* Creates new instance of explicit data streamer.
*
* @return New instance of data streamer.
@@ -1045,7 +1055,7 @@ public class IgfsDataManager extends IgfsManager {
// Create non-colocated key.
IgfsBlockKey key = new IgfsBlockKey(colocatedKey.getFileId(), null,
- colocatedKey.evictExclude(), colocatedKey.getBlockId());
+ colocatedKey.evictExclude(), colocatedKey.blockId());
try (IgniteInternalTx tx = dataCachePrj.txStartEx(PESSIMISTIC, REPEATABLE_READ)) {
// Lock keys.
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 01e434f..59674f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -179,8 +179,11 @@ public final class IgfsImpl implements IgfsEx {
data = igfsCtx.data();
secondaryFs = cfg.getSecondaryFileSystem();
- if (secondaryFs instanceof IgfsKernalContextAware)
- ((IgfsKernalContextAware)secondaryFs).setKernalContext(igfsCtx.kernalContext());
+ if (secondaryFs != null) {
+ igfsCtx.kernalContext().resource().injectGeneric(secondaryFs);
+
+ igfsCtx.kernalContext().resource().injectFileSystem(secondaryFs, this);
+ }
if (secondaryFs instanceof LifecycleAware)
((LifecycleAware)secondaryFs).start();
@@ -635,7 +638,7 @@ public final class IgfsImpl implements IgfsEx {
IgfsFile file = secondaryFs.update(path, props);
if (file != null)
- return new IgfsFileImpl(secondaryFs.update(path, props), data.groupBlockSize());
+ return new IgfsFileImpl(file, data.groupBlockSize());
}
return null;
@@ -1263,6 +1266,9 @@ public final class IgfsImpl implements IgfsEx {
IgfsMode mode = resolveMode(path);
+ if (mode == PROXY)
+ return secondaryFs.affinity(path, start, len, maxLen);
+
// Check memory first.
IgfsEntryInfo info = meta.infoForPath(path);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
deleted file mode 100644
index 7f59db4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsKernalContextAware.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import org.apache.ignite.internal.GridKernalContext;
-
-/**
- * Indicates whether particular file system accepts kernal context.
- */
-public interface IgfsKernalContextAware {
- /**
- * Set kernal context.
- *
- * @param ctx Kernal context.
- */
- public void setKernalContext(GridKernalContext ctx);
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 4e14b46..1c135fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteException;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -121,4 +122,10 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
@Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
igfs.setTimes(path, accessTime, modificationTime);
}
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+ long maxLen) throws IgniteException {
+ return igfs.affinity(path, start, len, maxLen);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
new file mode 100644
index 0000000..e3990df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemBlockKey.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.secondary.local;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsBaseBlockKey;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * File's binary data block key.
+ */
+public final class LocalFileSystemBlockKey implements IgfsBaseBlockKey, Comparable<LocalFileSystemBlockKey> {
+ /** IGFS path. */
+ private IgfsPath path;
+
+ /** Block ID. */
+ private long blockId;
+
+ /**
+ * Constructs file's binary data block key.
+ *
+ * @param path IGFS path.
+ * @param blockId Block ID.
+ */
+ public LocalFileSystemBlockKey(IgfsPath path, long blockId) {
+ assert path != null;
+ assert blockId >= 0;
+
+ this.path = path;
+ this.blockId = blockId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long blockId() {
+ return blockId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int fileHash() {
+ return path.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid affinityKey() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int compareTo(@NotNull LocalFileSystemBlockKey o) {
+ int res = path.compareTo(o.path);
+
+ if (res != 0)
+ return res;
+
+ long v1 = blockId;
+ long v2 = o.blockId;
+
+ if (v1 != v2)
+ return v1 > v2 ? 1 : -1;
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return path.hashCode() + (int)(blockId ^ (blockId >>> 32));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (o == null || !(o instanceof LocalFileSystemBlockKey))
+ return false;
+
+ LocalFileSystemBlockKey that = (LocalFileSystemBlockKey)o;
+
+ return blockId == that.blockId && path.equals(that.path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(LocalFileSystemBlockKey.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
new file mode 100644
index 0000000..6bdba95
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalFileSystemPositionedReadable.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.secondary.local;
+
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+/**
+ * Positioned readable interface for local secondary file system.
+ */
+public class LocalFileSystemPositionedReadable extends BufferedInputStream
+ implements IgfsSecondaryFileSystemPositionedReadable {
+ /** Last read position. */
+ private long lastReadPos;
+
+ /**
+ * Constructor.
+ *
+ * @param in Input stream.
+ * @param bufSize Buffer size.
+ */
+ public LocalFileSystemPositionedReadable(FileInputStream in, int bufSize) {
+ super(in, bufSize);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException {
+ if (in == null)
+ throw new IOException("Stream is closed.");
+
+ if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) {
+ ((FileInputStream)in).getChannel().position(readPos);
+
+ pos = 0;
+ count = 0;
+ }
+
+ int bytesRead = read(buf, off, len);
+
+ if (bytesRead != -1) {
+ // Advance last read position only if we really read some bytes from the stream.
+ lastReadPos = readPos + bytesRead;
+ }
+
+ return bytesRead;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
deleted file mode 100644
index ebf56ad..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/secondary/local/LocalIgfsSecondaryFileSystemPositionedReadable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.secondary.local;
-
-import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-
-/**
- * Positioned readable interface for local secondary file system.
- */
-public class LocalIgfsSecondaryFileSystemPositionedReadable extends BufferedInputStream
- implements IgfsSecondaryFileSystemPositionedReadable {
- /** Last read position. */
- private long lastReadPos;
-
- /**
- * Constructor.
- *
- * @param in Input stream.
- * @param bufSize Buffer size.
- */
- public LocalIgfsSecondaryFileSystemPositionedReadable(FileInputStream in, int bufSize) {
- super(in, bufSize);
- }
-
- /** {@inheritDoc} */
- @Override public int read(long readPos, byte[] buf, int off, int len) throws IOException {
- if (in == null)
- throw new IOException("Stream is closed.");
-
- if (readPos < lastReadPos || readPos + len > lastReadPos + this.buf.length) {
- ((FileInputStream)in).getChannel().position(readPos);
-
- pos = 0;
- count = 0;
- }
-
- int bytesRead = read(buf, off, len);
-
- if (bytesRead != -1) {
- // Advance last read position only if we really read some bytes from the stream.
- lastReadPos = readPos + bytesRead;
- }
-
- return bytesRead;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index 0158973..07a4fff 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.FileSystemResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.JobContextResource;
import org.apache.ignite.resources.LoadBalancerResource;
@@ -511,7 +512,10 @@ public class GridResourceIoc {
JOB_CONTEXT(JobContextResource.class),
/** */
- CACHE_STORE_SESSION(CacheStoreSessionResource.class);
+ CACHE_STORE_SESSION(CacheStoreSessionResource.class),
+
+ /** */
+ FILESYSTEM_RESOURCE(FileSystemResource.class);
/** */
public final Class<? extends Annotation> clazz;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 84d07b6..bdfbe50 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -20,8 +20,8 @@ package org.apache.ignite.internal.processors.resource;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.ComputeJobContext;
@@ -201,6 +201,26 @@ public class GridResourceProcessor extends GridProcessorAdapter {
}
/**
+ * Injects filesystem instance into given object.
+ *
+ * @param obj Object.
+ * @param igfs Ignite filesystem to inject.
+ * @return {@code True} if filesystem was injected.
+ * @throws IgniteCheckedException If failed to inject.
+ */
+ public boolean injectFileSystem(Object obj, IgniteFileSystem igfs) throws IgniteCheckedException {
+ assert obj != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Injecting cache store session: " + obj);
+
+ // Unwrap Proxy object.
+ obj = unwrapTarget(obj);
+
+ return inject(obj, GridResourceIoc.ResourceAnnotation.FILESYSTEM_RESOURCE, null, null, igfs);
+ }
+
+ /**
* @param obj Object to inject.
* @throws IgniteCheckedException If failed to inject.
*/
@@ -308,6 +328,10 @@ public class GridResourceProcessor extends GridProcessorAdapter {
res = new GridResourceJobContextInjector((ComputeJobContext)param);
break;
+ case FILESYSTEM_RESOURCE:
+ res = new GridResourceBasicInjector<>(param);
+ break;
+
default:
res = injectorByAnnotation[ann.ordinal()];
break;
@@ -318,6 +342,11 @@ public class GridResourceProcessor extends GridProcessorAdapter {
/**
* @param obj Object to inject.
+ * @param ann Annotation enum.
+ * @param dep Grid deployment object.
+ * @param depCls Grid deployment class.
+ * @param param Resource to inject.
+ * @return {@code True} if resource was injected.
* @throws IgniteCheckedException If failed to inject.
*/
private boolean inject(Object obj, GridResourceIoc.ResourceAnnotation ann, @Nullable GridDeployment dep,
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
new file mode 100644
index 0000000..e2aa06d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/resources/FileSystemResource.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.resources;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotates a field or a setter method for injection of primary Ignite filesystem to a secondary
+ * filesystem implementation.
+ *
+ * <p>
+ * Here is how injection would typically happen:
+ * <pre name="code" class="java">
+ * public class MySecondaryFS implements IgfsSecondaryFileSystem {
+ * ...
+ * // Inject instance of primary filesystem.
+ * @FileSystemResource
+ * private IgniteFileSystem igfs;
+ * ...
+ * }
+ * </pre>
+ * or attach the same annotations to methods:
+ * <pre name="code" class="java">
+ * public class MySecondaryFS implements IgfsSecondaryFileSystem {
+ * ...
+ * private IgniteFileSystem igfs;
+ * ...
+ * // Inject instance of primary filesystem.
+ * @FileSystemResource
+ * public void setIgfs(IgniteFileSystem igfs) {
+ * this.igfs = igfs;
+ * }
+ * ...
+ * }
+ * </pre>
+ * <p>
+ */
+@Documented
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.METHOD, ElementType.FIELD})
+public @interface FileSystemResource {
+ // No-op.
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 04f3c8e..d0b700e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -736,7 +736,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
if(!propertiesSupported())
return;
- if (dual && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) {
+ if (mode != PRIMARY && !(igfsSecondaryFileSystem instanceof IgfsSecondaryFileSystemImpl)) {
// In case of Hadoop dual mode only user name, group name, and permission properties are updated,
// an arbitrary named property is just ignored:
checkRootPropertyUpdate("foo", "moo", null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index bea318d..7b83cfc 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CyclicBarrier;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
+import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* Tests for IGFS working in mode when remote file system exists: DUAL_SYNC, DUAL_ASYNC.
@@ -48,8 +49,6 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
*/
protected IgfsDualAbstractSelfTest(IgfsMode mode) {
super(mode);
-
- assert mode == DUAL_SYNC || mode == DUAL_ASYNC;
}
/** {@inheritDoc} */
@@ -72,10 +71,13 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
assert igfs.exists(p);
assert igfs.modeResolver().resolveMode(gg) == mode;
- assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC;
- assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC;
- assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY;
- assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only.
+
+ if (mode != PROXY) {
+ assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "sync")) == IgfsMode.DUAL_SYNC;
+ assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "async")) == IgfsMode.DUAL_ASYNC;
+ assert igfs.modeResolver().resolveMode(new IgfsPath(gg, "primary")) == IgfsMode.PRIMARY;
+ assert !igfsSecondary.exists("/ignite/primary"); // PRIMARY mode path must exist in upper level fs only.
+ }
// All the child paths of "/ignite/" must be visible in listings:
assert igfs.listFiles(gg).size() == 3;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
index e7f9bbb..e7e3ac8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemProxySelfTest.java
@@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -58,6 +59,11 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest
/** */
private final File fileLinkSrc = new File(FS_WORK_DIR + File.separatorChar + "file");
+ /** {@inheritDoc} */
+ @Override protected int nodeCount() {
+ return 3;
+ }
+
/**
* Creates secondary filesystems.
* @return IgfsSecondaryFileSystem
@@ -215,6 +221,81 @@ public class IgfsLocalSecondaryFileSystemProxySelfTest extends IgfsProxySelfTest
}
/**
+ * @throws Exception If failed.
+ */
+ public void testAffinityMaxLen() throws Exception {
+ awaitPartitionMapExchange();
+
+ long fileSize = 32 * 1024 * 1024;
+
+ IgfsPath filePath = new IgfsPath("/file");
+
+ try (OutputStream os = igfs.create(filePath, true)) {
+ for(int i = 0; i < fileSize / chunk.length; ++i)
+ os.write(chunk);
+ }
+
+ Collection<IgfsBlockLocation> blocks;
+
+ long len = igfs.info(filePath).length();
+ int start = 0;
+
+ // Check default maxLen (maxLen = 0)
+ for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+ Collection<IgfsBlockLocation> blocks0 =
+ igfs.affinity(filePath, start, len, 0);
+
+ blocks = igfs.affinity(filePath, start, len, Long.MAX_VALUE);
+
+ assertTrue(blocks0.size() > 1);
+ assertEquals(blocks0.size(), blocks.size());
+ assertEquals(F.first(blocks).start(), start);
+ assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+ assertEquals(blocks0, blocks);
+
+ len -= 1024 * 2;
+ start += 1024;
+ System.out.println("+++ ");
+ }
+
+ len = igfs.info(filePath).length();
+ start = 0;
+ long maxLen = igfs.context().data().groupBlockSize() * 2;
+
+ // Different cases of start, len and maxLen
+ for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+ blocks = igfs.affinity(filePath, start, len, maxLen);
+
+ assertEquals(F.first(blocks).start(), start);
+ assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+
+ long totalLen = 0;
+
+ for (IgfsBlockLocation block : blocks) {
+ totalLen += block.length();
+
+ assert block.length() <= maxLen : "block.length() <= maxLen. [block.length=" + block.length()
+ + ", maxLen=" + maxLen + ']';
+
+ assert block.length() + block.start() <= start + len : "block.length() + block.start() < start + len. [block.length=" + block.length()
+ + ", block.start()=" + block.start() + ", start=" + start +", len=" + len + ']';
+
+ for (IgfsBlockLocation block0 : blocks)
+ if (!block0.equals(block))
+ assert block.start() < block0.start() && block.start() + block.length() <= block0.start() ||
+ block.start() > block0.start() && block0.start() + block0.length() <= block.start()
+ : "Blocks cross each other: block0=" + block + ", block1= " + block0;
+ }
+
+ assert totalLen == len : "Summary length of blocks must be: " + len + " actual: " + totalLen;
+
+ len -= 1024 * 2;
+ start += 1024;
+ maxLen -= igfs.context().data().groupBlockSize() * 2 / 1024;
+ }
+ }
+
+ /**
*
* @throws Exception If failed.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
new file mode 100644
index 0000000..d4187cb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.resources.FileSystemResource;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+
+/**
+ * Tests for resource injection to secondary file system.
+ */
+public class IgfsSecondaryFileSystemInjectionSelfTest extends GridCommonAbstractTest {
+ /** IGFS name. */
+ protected static final String IGFS_NAME = "igfs-test";
+
+ /** Test implementation of secondary filesystem. */
+ private TestBaseSecondaryFsMock secondary;
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ G.stopAll(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ FileSystemConfiguration igfsCfg = new FileSystemConfiguration();
+
+ igfsCfg.setDataCacheName("dataCache");
+ igfsCfg.setMetaCacheName("metaCache");
+ igfsCfg.setName(IGFS_NAME);
+ igfsCfg.setDefaultMode(IgfsMode.DUAL_SYNC);
+ igfsCfg.setSecondaryFileSystem(secondary);
+
+ CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
+
+ dataCacheCfg.setName("dataCache");
+ dataCacheCfg.setCacheMode(PARTITIONED);
+ dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ dataCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ CacheConfiguration metaCacheCfg = defaultCacheConfiguration();
+
+ metaCacheCfg.setName("metaCache");
+ metaCacheCfg.setCacheMode(REPLICATED);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ cfg.setFileSystemConfiguration(igfsCfg);
+ cfg.setCacheConfiguration(metaCacheCfg, dataCacheCfg);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"UnusedDeclaration"})
+ public void testInjectPrimaryByField() throws Exception {
+ secondary = new TestBaseSecondaryFsMock() {
+ @FileSystemResource
+ private IgfsImpl igfs;
+
+ @LoggerResource
+ private IgniteLogger log;
+
+ @IgniteInstanceResource
+ private Ignite ig;
+
+ @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) {
+ return igfs == primary && log instanceof IgniteLogger && ig == ignite;
+ }
+ };
+
+ Ignite ig = startGrid(0);
+
+ IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME);
+
+ assert secondary.checkInjection(ig, igfs);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @SuppressWarnings({"UnusedDeclaration"})
+ public void testInjectPrimaryByMethods() throws Exception {
+ secondary = new TestBaseSecondaryFsMock() {
+ /** Ignite instance. */
+ private Ignite ig;
+
+ /** IGFS instance. */
+ private IgniteFileSystem igfs;
+
+ /** Logger injected flag */
+ private boolean logSet;
+
+ /**
+ * @param igfs Primary IGFS.
+ */
+ @FileSystemResource
+ void setPrimaryIgfs(IgfsImpl igfs) {
+ this.igfs = igfs;
+ }
+
+ /**
+ * @param log Ignite logger.
+ */
+ @LoggerResource
+ void setIgLogger(IgniteLogger log) {
+ logSet = log instanceof IgniteLogger;
+ }
+
+ /**
+ * @param ig Ignite instance.
+ */
+ @IgniteInstanceResource
+ void setIgniteInst(Ignite ig) {
+ this.ig = ig;
+ }
+
+ @Override boolean checkInjection(Ignite ignite, IgniteFileSystem primary) {
+ return ignite == ig && primary == igfs && logSet;
+ }
+ };
+
+ Ignite ig = startGrid(0);
+
+ IgniteFileSystem igfs = ig.fileSystem(IGFS_NAME);
+
+ assert secondary.checkInjection(ig, igfs);
+ }
+
+ /**
+ *
+ */
+ private static abstract class TestBaseSecondaryFsMock implements IgfsSecondaryFileSystem {
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path) throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public IgfsSecondaryFileSystemPositionedReadable open(IgfsPath path, int bufSize) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream create(IgfsPath path, boolean overwrite) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public OutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication, long blockSize,
+ @Nullable Map<String, String> props) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
+ @Nullable Map<String, String> props) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile info(IgfsPath path) throws IgniteException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long usedSpaceSize() throws IgniteException {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+ long maxLen) throws IgniteException {
+ return null;
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ * @param primary Primary IGFS.
+ * @return {@code True} if injection is correct.
+ */
+ abstract boolean checkInjection(Ignite ignite, IgniteFileSystem primary);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
index 775c2ce..76ed440 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteIgfsTestSuite.java
@@ -59,6 +59,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorValidationSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsProxySelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsLocalSecondaryFileSystemProxySelfTest;
+import org.apache.ignite.internal.processors.igfs.IgfsSecondaryFileSystemInjectionSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsServerManagerIpcEndpointRegistrationOnWindowsSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsSizeSelfTest;
import org.apache.ignite.internal.processors.igfs.IgfsStartCacheTest;
@@ -167,6 +168,8 @@ public class IgniteIgfsTestSuite extends TestSuite {
suite.addTestSuite(IgfsAtomicPrimaryOffheapTieredSelfTest.class);
suite.addTestSuite(IgfsAtomicPrimaryOffheapValuesSelfTest.class);
+ suite.addTestSuite(IgfsSecondaryFileSystemInjectionSelfTest.class);
+
return suite;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index d9215db..674cca7 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -20,20 +20,22 @@ package org.apache.ignite.hadoop.fs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.IgfsUserContext;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopDelegateUtils;
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
-import org.apache.ignite.internal.processors.igfs.IgfsKernalContextAware;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lifecycle.LifecycleAware;
+import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.Nullable;
import java.io.OutputStream;
@@ -46,8 +48,8 @@ import java.util.concurrent.Callable;
* <p>
* Target {@code FileSystem}'s are created on per-user basis using passed {@link HadoopFileSystemFactory}.
*/
-public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, IgfsKernalContextAware,
- LifecycleAware, HadoopPayloadAware {
+public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, LifecycleAware,
+ HadoopPayloadAware {
/** The default user name. It is used if no user context is set. */
private String dfltUsrName;
@@ -245,8 +247,17 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
}
/** {@inheritDoc} */
- @Override public void setKernalContext(GridKernalContext ctx) {
- this.ctx = ctx;
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+ long maxLen) throws IgniteException {
+ return target.affinity(path, start, len, maxLen);
+ }
+
+ /**
+ * @param ignite Ignite instance.
+ */
+ @IgniteInstanceResource
+ public void setIgniteInstance(IgniteEx ignite) {
+ ctx = ignite.context();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
index e336fad..fe6492e 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.processors.hadoop.impl.delegate;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
@@ -28,6 +31,7 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.hadoop.fs.CachingHadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.HadoopFileSystemFactory;
import org.apache.ignite.hadoop.fs.IgniteHadoopIgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsDirectoryNotEmptyException;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
@@ -42,6 +46,7 @@ import org.apache.ignite.internal.processors.hadoop.delegate.HadoopFileSystemFac
import org.apache.ignite.internal.processors.hadoop.delegate.HadoopIgfsSecondaryFileSystemDelegate;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsProperties;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.HadoopIgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
import org.apache.ignite.internal.processors.igfs.IgfsFileImpl;
import org.apache.ignite.internal.processors.igfs.IgfsUtils;
@@ -104,12 +109,17 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
final FileSystem fileSys = fileSystemForUser();
+ Path hadoopPath = convert(path);
+
try {
+ if (!fileSys.exists(hadoopPath))
+ return null;
+
if (props0.userName() != null || props0.groupName() != null)
- fileSys.setOwner(convert(path), props0.userName(), props0.groupName());
+ fileSys.setOwner(hadoopPath, props0.userName(), props0.groupName());
if (props0.permission() != null)
- fileSys.setPermission(convert(path), props0.permission());
+ fileSys.setPermission(hadoopPath, props0.permission());
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to update file properties [path=" + path + "]");
@@ -266,7 +276,14 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
@Override public OutputStream append(IgfsPath path, int bufSize, boolean create,
@Nullable Map<String, String> props) {
try {
- return fileSystemForUser().append(convert(path), bufSize);
+ Path hadoopPath = convert(path);
+
+ FileSystem fs = fileSystemForUser();
+
+ if (create && !fs.exists(hadoopPath))
+ return fs.create(hadoopPath, false, bufSize);
+ else
+ return fs.append(convert(path), bufSize);
}
catch (IOException e) {
throw handleSecondaryFsError(e, "Failed to append file [path=" + path + ", bufSize=" + bufSize + "]");
@@ -371,6 +388,24 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
}
/** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len,
+ long maxLen) throws IgniteException {
+ try {
+ BlockLocation[] hadoopBlocks = fileSystemForUser().getFileBlockLocations(convert(path), start, len);
+
+ List<IgfsBlockLocation> blks = new ArrayList<>(hadoopBlocks.length);
+
+ for (int i = 0; i < hadoopBlocks.length; ++i)
+ blks.add(convertBlockLocation(hadoopBlocks[i]));
+
+ return blks;
+ }
+ catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed affinity for path: " + path);
+ }
+ }
+
+ /** {@inheritDoc} */
public void start() {
factory.start();
}
@@ -393,6 +428,25 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
}
/**
+ * Convert IGFS affinity block location into Hadoop affinity block location.
+ *
+ * @param block IGFS affinity block location.
+ * @return Hadoop affinity block location.
+ */
+ private IgfsBlockLocation convertBlockLocation(BlockLocation block) {
+ try {
+ String[] names = block.getNames();
+ String[] hosts = block.getHosts();
+
+ return new IgfsBlockLocationImpl(
+ block.getOffset(), block.getLength(),
+ Arrays.asList(names), Arrays.asList(hosts));
+ } catch (IOException e) {
+ throw handleSecondaryFsError(e, "Failed convert block location: " + block);
+ }
+ }
+
+ /**
* Heuristically checks if exception was caused by invalid HDFS version and returns appropriate exception.
*
* @param e Exception to check.
@@ -406,6 +460,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
/**
* Cast IO exception to IGFS exception.
*
+ * @param msg Error message.
* @param e IO exception.
* @return IGFS exception.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
new file mode 100644
index 0000000..c7c792d
--- /dev/null
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/Hadoop1OverIgfsProxyTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl.igfs;
+
+import java.io.OutputStream;
+import java.util.Collection;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * DUAL_ASYNC mode test.
+ */
+public class Hadoop1OverIgfsProxyTest extends Hadoop1DualAbstractTest {
+ /**
+ * Constructor.
+ */
+ public Hadoop1OverIgfsProxyTest() {
+ super(IgfsMode.PROXY);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testAffinity() throws Exception {
+ long fileSize = 32 * 1024 * 1024;
+
+ IgfsPath filePath = new IgfsPath("/file");
+
+ try (OutputStream os = igfs.create(filePath, true)) {
+ for(int i = 0; i < fileSize / chunk.length; ++i)
+ os.write(chunk);
+ }
+
+ long len = igfs.info(filePath).length();
+ int start = 0;
+
+ // Check default maxLen (maxLen = 0)
+ for (int i = 0; i < igfs.context().data().groupBlockSize() / 1024; i++) {
+ Collection<IgfsBlockLocation> blocks = igfs.affinity(filePath, start, len);
+
+ assertEquals(F.first(blocks).start(), start);
+ assertEquals(start + len, F.last(blocks).start() + F.last(blocks).length());
+
+ len -= 1024 * 2;
+ start += 1024;
+ }
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/2df39a80/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 6046cc1..01893fb 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProt
import org.apache.ignite.internal.processors.hadoop.impl.client.HadoopClientProtocolSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopTxConfigCacheTest;
import org.apache.ignite.internal.processors.hadoop.impl.fs.KerberosHadoopFileSystemFactorySelfTest;
+import org.apache.ignite.internal.processors.hadoop.impl.igfs.Hadoop1OverIgfsProxyTest;
import org.apache.ignite.internal.processors.hadoop.impl.util.BasicUserNameMapperSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.util.ChainedUserNameMapperSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.util.KerberosUserNameMapperSelfTest;
@@ -136,6 +137,7 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualSyncTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsDualAsyncTest.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(Hadoop1OverIgfsProxyTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopFIleSystemFactorySelfTest.class.getName())));