You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/05 14:42:48 UTC
[3/7] ignite git commit: WIP on Juawei optos.
WIP on Juawei optos.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/79644c16
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/79644c16
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/79644c16
Branch: refs/heads/ignite-3246
Commit: 79644c16cf43af326980857ebf1496bac56bd8e8
Parents: 50a8ec2
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Sun Jun 5 15:34:03 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Sun Jun 5 15:34:03 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 6 +-
.../internal/cluster/ClusterGroupAdapter.java | 6 +
.../ignite/internal/cluster/ClusterGroupEx.java | 14 ++-
.../internal/processors/igfs/IgfsContext.java | 14 +--
.../internal/processors/igfs/IgfsImpl.java | 17 ++-
.../processors/igfs/IgfsMetaManager.java | 46 +++++++-
.../processors/igfs/IgfsNodePredicate.java | 73 ++++++++++++
.../internal/processors/igfs/IgfsProcessor.java | 37 ++++--
.../internal/processors/igfs/IgfsUtils.java | 21 ++++
.../igfs/client/IgfsClientAbstractCallable.java | 112 +++++++++++++++++++
.../igfs/client/IgfsClientDeleteCallable.java | 83 ++++++++++++++
.../igfs/client/IgfsClientExistsCallable.java | 76 +++++++++++++
.../igfs/client/IgfsClientFileIdsCallable.java | 79 +++++++++++++
.../igfs/client/IgfsClientMkdirsCallable.java | 106 ++++++++++++++++++
.../multijvm/IgniteClusterProcessProxy.java | 5 +
15 files changed, 664 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index d1f3ef5..d257807 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -862,9 +862,9 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(new DataStreamProcessor(ctx));
startProcessor((GridProcessor)IGFS.create(ctx, F.isEmpty(cfg.getFileSystemConfiguration())));
startProcessor(new GridContinuousProcessor(ctx));
- startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
- IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
- IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
+// startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
+// IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
+// IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
index 75168a1..c664f1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupAdapter.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.IgniteServicesImpl;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.executor.GridExecutorService;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.processors.igfs.IgfsNodePredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -602,6 +603,11 @@ public class ClusterGroupAdapter implements ClusterGroupEx, Externalizable {
}
/** {@inheritDoc} */
+ @Override public ClusterGroup forIgfsMetadataDataNodes(@Nullable String igfsName, @Nullable String metaCacheName) {
+ return forPredicate(new IgfsNodePredicate(igfsName)).forDataNodes(metaCacheName);
+ }
+
+ /** {@inheritDoc} */
@Override public final ClusterGroup forHost(ClusterNode node) {
A.notNull(node, "node");
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
index 59da7cf..21533a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterGroupEx.java
@@ -29,7 +29,7 @@ public interface ClusterGroupEx extends ClusterGroup {
* Creates projection for specified subject ID.
*
* @param subjId Subject ID.
- * @return Internal projection.
+ * @return Cluster group.
*/
public ClusterGroupEx forSubjectId(UUID subjId);
@@ -40,5 +40,15 @@ public interface ClusterGroupEx extends ClusterGroup {
* @param clientNodes Flag to include client nodes.
* @return Cluster group.
*/
- public ClusterGroup forCacheNodes(@Nullable String cacheName, boolean affNodes, boolean nearNodes, boolean clientNodes);
+ public ClusterGroup forCacheNodes(@Nullable String cacheName, boolean affNodes, boolean nearNodes,
+ boolean clientNodes);
+
+ /**
+ * Create projection for IGFS server nodes.
+ *
+ * @param igfsName IGFS name.
+ * @param metaCacheName Metadata cache name.
+ * @return Cluster group.
+ */
+ public ClusterGroup forIgfsMetadataDataNodes(@Nullable String igfsName, @Nullable String metaCacheName);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
index 2b08f28..a638bf3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsContext.java
@@ -24,11 +24,8 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
-
/**
* IGFS context holding all required components for IGFS instance.
*/
@@ -178,16 +175,7 @@ public class IgfsContext {
* @return {@code True} if node has IGFS with this name, {@code false} otherwise.
*/
public boolean igfsNode(ClusterNode node) {
- assert node != null;
-
- IgfsAttributes[] igfs = node.attribute(ATTR_IGFS);
-
- if (igfs != null)
- for (IgfsAttributes attrs : igfs)
- if (F.eq(cfg.getName(), attrs.igfsName()))
- return true;
-
- return false;
+ return IgfsUtils.isIgfsNode(node, cfg.getName());
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 967d962..19dc9fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -55,6 +55,9 @@ import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientExistsCallable;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientMkdirsCallable;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -96,13 +99,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -532,6 +533,9 @@ public final class IgfsImpl implements IgfsEx {
@Override public boolean exists(final IgfsPath path) {
A.notNull(path, "path");
+ if (meta.isClient())
+ return meta.runClientTask(new IgfsClientExistsCallable(cfg.getName(), path));
+
return safeOp(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
if (log.isDebugEnabled())
@@ -710,6 +714,9 @@ public final class IgfsImpl implements IgfsEx {
@Override public boolean delete(final IgfsPath path, final boolean recursive) {
A.notNull(path, "path");
+ if (meta.isClient())
+ return meta.runClientTask(new IgfsClientDeleteCallable(cfg.getName(), path, recursive));
+
return safeOp(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
if (log.isDebugEnabled())
@@ -763,6 +770,12 @@ public final class IgfsImpl implements IgfsEx {
@Override public void mkdirs(final IgfsPath path, @Nullable final Map<String, String> props) {
A.notNull(path, "path");
+ if (meta.isClient()) {
+ meta.runClientTask(new IgfsClientMkdirsCallable(cfg.getName(), path, props));
+
+ return ;
+ }
+
safeOp(new Callable<Void>() {
@Override public Void call() throws Exception {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index a4212ba..5f3d996 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -18,10 +18,13 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.IgfsEvent;
@@ -37,6 +40,7 @@ import org.apache.ignite.igfs.IgfsPathIsNotDirectoryException;
import org.apache.ignite.igfs.IgfsPathNotFoundException;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
@@ -44,6 +48,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientAbstractCallable;
+import org.apache.ignite.internal.processors.igfs.client.IgfsClientFileIdsCallable;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaDirectoryCreateProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileCreateProcessor;
import org.apache.ignite.internal.processors.igfs.meta.IgfsMetaFileLockProcessor;
@@ -139,13 +145,18 @@ public class IgfsMetaManager extends IgfsManager {
/** Relaxed flag. */
private final boolean relaxed;
+ /** Client flag. */
+ private final boolean client;
+
/**
* Constructor.
*
* @param relaxed Relaxed mode flag.
+ * @param client Client flag.
*/
- public IgfsMetaManager(boolean relaxed) {
+ public IgfsMetaManager(boolean relaxed, boolean client) {
this.relaxed = relaxed;
+ this.client = client;
}
/**
@@ -217,6 +228,36 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * @return Client flag.
+ */
+ boolean isClient() {
+ return client;
+ }
+
+ /**
+ * Run client task.
+ *
+ * @param task Task.
+ * @return Result.
+ */
+ <T> T runClientTask(IgfsClientAbstractCallable<T> task) {
+ assert client;
+
+ IgniteEx ignite = igfsCtx.kernalContext().grid();
+
+ ClusterGroup cluster = ignite.cluster().forIgfsMetadataDataNodes(cfg.getName(), cfg.getMetaCacheName());
+
+ IgniteCompute compute = ignite.compute(cluster);
+
+ try {
+ return compute.call(task);
+ }
+ catch (ClusterTopologyException e) {
+ throw new IgfsException("Failed to execute operation because there are no IGFS metadata nodes left." , e);
+ }
+ }
+
+ /**
* Return nodes where meta cache is defined.
*
* @return Nodes where meta cache is defined.
@@ -398,6 +439,9 @@ public class IgfsMetaManager extends IgfsManager {
private List<IgniteUuid> fileIds(IgfsPath path, boolean skipTx) throws IgniteCheckedException {
assert path != null;
+ if (client)
+ return runClientTask(new IgfsClientFileIdsCallable(cfg.getName(), path));
+
// Path components.
Collection<String> components = path.components();
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNodePredicate.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNodePredicate.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNodePredicate.java
new file mode 100644
index 0000000..aa5b2a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsNodePredicate.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS node predicate.
+ */
+public class IgfsNodePredicate implements IgnitePredicate<ClusterNode>, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** IGFS name. */
+ private String igfsName;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsNodePredicate() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ */
+ public IgfsNodePredicate(@Nullable String igfsName) {
+ this.igfsName = igfsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean apply(ClusterNode node) {
+ return IgfsUtils.isIgfsNode(node, igfsName);
+ }
+
+ @Override
+ public void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+
+ }
+
+ @Override
+ public void readBinary(BinaryReader reader) throws BinaryObjectException {
+
+ }
+
+ @Override public String toString() {
+ return S.toString(IgfsNodePredicate.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
index 778de99..92b43cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsProcessor.java
@@ -95,10 +95,12 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
/** {@inheritDoc} */
@Override public void start() throws IgniteCheckedException {
- if (ctx.config().isDaemon())
+ IgniteConfiguration igniteCfg = ctx.config();
+
+ if (igniteCfg.isDaemon())
return;
- FileSystemConfiguration[] cfgs = ctx.config().getFileSystemConfiguration();
+ FileSystemConfiguration[] cfgs = igniteCfg.getFileSystemConfiguration();
assert cfgs != null && cfgs.length > 0;
@@ -108,10 +110,27 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
for (FileSystemConfiguration cfg : cfgs) {
FileSystemConfiguration cfg0 = new FileSystemConfiguration(cfg);
+ boolean metaClient = true;
+
+ CacheConfiguration[] cacheCfgs = igniteCfg.getCacheConfiguration();
+
+ if (cacheCfgs != null) {
+ for (CacheConfiguration cacheCfg : cacheCfgs) {
+ if (F.eq(cacheCfg.getName(), cfg.getMetaCacheName())) {
+ metaClient = false;
+
+ break;
+ }
+ }
+ }
+
+ if (igniteCfg.isClientMode() != null && igniteCfg.isClientMode())
+ metaClient = true;
+
IgfsContext igfsCtx = new IgfsContext(
ctx,
cfg0,
- new IgfsMetaManager(cfg0.isRelaxedConsistency()),
+ new IgfsMetaManager(cfg0.isRelaxedConsistency(), metaClient),
new IgfsDataManager(),
new IgfsServerManager(),
new IgfsFragmentizerManager());
@@ -126,19 +145,17 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
if (log.isDebugEnabled())
log.debug("IGFS processor started.");
- IgniteConfiguration gridCfg = ctx.config();
-
// Node doesn't have IGFS if it:
// is daemon;
// doesn't have configured IGFS;
// doesn't have configured caches.
- if (gridCfg.isDaemon() || F.isEmpty(gridCfg.getFileSystemConfiguration()) ||
- F.isEmpty(gridCfg.getCacheConfiguration()))
+ if (igniteCfg.isDaemon() || F.isEmpty(igniteCfg.getFileSystemConfiguration()) ||
+ F.isEmpty(igniteCfg.getCacheConfiguration()))
return;
final Map<String, CacheConfiguration> cacheCfgs = new HashMap<>();
- F.forEach(gridCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() {
+ F.forEach(igniteCfg.getCacheConfiguration(), new CI1<CacheConfiguration>() {
@Override public void apply(CacheConfiguration c) {
cacheCfgs.put(c.getName(), c);
}
@@ -146,9 +163,9 @@ public class IgfsProcessor extends IgfsProcessorAdapter {
Collection<IgfsAttributes> attrVals = new ArrayList<>();
- assert gridCfg.getFileSystemConfiguration() != null;
+ assert igniteCfg.getFileSystemConfiguration() != null;
- for (FileSystemConfiguration igfsCfg : gridCfg.getFileSystemConfiguration()) {
+ for (FileSystemConfiguration igfsCfg : igniteCfg.getFileSystemConfiguration()) {
CacheConfiguration cacheCfg = cacheCfgs.get(igfsCfg.getDataCacheName());
if (cacheCfg == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 1b97565..e45e34e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -56,6 +56,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -681,4 +682,24 @@ public class IgfsUtils {
else
return null;
}
+
+ /**
+ * Check whether provided node contains IGFS with the given name.
+ *
+ * @param node Node.
+ * @param igfsName IGFS name.
+ * @return {@code True} if it contains IGFS.
+ */
+ public static boolean isIgfsNode(ClusterNode node, String igfsName) {
+ assert node != null;
+
+ IgfsAttributes[] igfs = node.attribute(ATTR_IGFS);
+
+ if (igfs != null)
+ for (IgfsAttributes attrs : igfs)
+ if (F.eq(igfsName, attrs.igfsName()))
+ return true;
+
+ return false;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
new file mode 100644
index 0000000..b83ed13
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientAbstractCallable.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryWriter;
+import org.apache.ignite.binary.Binarylizable;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Abstract callable for IGFS tasks initiated on client node and passed to data node.
+ */
+public abstract class IgfsClientAbstractCallable<T> implements IgniteCallable<T>, Binarylizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** IGFS name. */
+ protected String igfsName;
+
+ /** Injected instance. */
+ @IgniteInstanceResource
+ private transient Ignite ignite;
+
+ /**
+ * Default constructor.
+ */
+ protected IgfsClientAbstractCallable() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ */
+ protected IgfsClientAbstractCallable(@Nullable String igfsName) {
+ this.igfsName = igfsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public final T call() throws Exception {
+ assert ignite != null;
+
+ IgfsEx igfs = (IgfsEx)ignite.fileSystem(igfsName);
+
+ return call0(igfs.context());
+ }
+
+ /**
+ * Execute task.
+ *
+ * @param ctx IGFS ocntext.
+ * @return Result.
+ * @throws Exception If failed.
+ */
+ protected abstract T call0(IgfsContext ctx) throws Exception;
+
+ /** {@inheritDoc} */
+ @Override public final void writeBinary(BinaryWriter writer) throws BinaryObjectException {
+ BinaryRawWriter rawWriter = writer.rawWriter();
+
+ rawWriter.writeString(igfsName);
+
+ writeBinary0(rawWriter);
+ }
+
+ /** {@inheritDoc} */
+ @Override public final void readBinary(BinaryReader reader) throws BinaryObjectException {
+ BinaryRawReader rawReader = reader.rawReader();
+
+ igfsName = rawReader.readString();
+
+ readBinary0(rawReader);
+ }
+
+ /**
+ * Write binary.
+ *
+ * @param rawWriter Raw writer.
+ */
+ protected abstract void writeBinary0(BinaryRawWriter rawWriter);
+
+ /**
+ * Read binary.
+ *
+ * @param rawReader Raw reader.
+ */
+ protected abstract void readBinary0(BinaryRawReader rawReader);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java
new file mode 100644
index 0000000..9b0095a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientDeleteCallable.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS client file IDs callable.
+ */
+public class IgfsClientDeleteCallable extends IgfsClientAbstractCallable<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /** Recursion flag. */
+ private boolean recursive;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsClientDeleteCallable() {
+ // NO-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ * @param path Path.
+ * @param recursive Recursive flag.
+ */
+ public IgfsClientDeleteCallable(@Nullable String igfsName, IgfsPath path, boolean recursive) {
+ super(igfsName);
+
+ this.path = path;
+ this.recursive = recursive;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Boolean call0(IgfsContext ctx) throws Exception {
+ return ctx.igfs().delete(path, recursive);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary0(BinaryRawWriter writer) throws BinaryObjectException {
+ writer.writeObject(path);
+ writer.writeBoolean(recursive);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException {
+ path = reader.readObject();
+ recursive = reader.readBoolean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsClientDeleteCallable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java
new file mode 100644
index 0000000..adef2db
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientExistsCallable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * IGFS client file IDs callable.
+ */
+public class IgfsClientExistsCallable extends IgfsClientAbstractCallable<Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsClientExistsCallable() {
+ // NO-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ * @param path Path.
+ */
+ public IgfsClientExistsCallable(@Nullable String igfsName, IgfsPath path) {
+ super(igfsName);
+
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Boolean call0(IgfsContext ctx) throws Exception {
+ return ctx.igfs().exists(path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary0(BinaryRawWriter writer) throws BinaryObjectException {
+ writer.writeObject(path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException {
+ path = reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsClientExistsCallable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientFileIdsCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientFileIdsCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientFileIdsCallable.java
new file mode 100644
index 0000000..a6c3ea8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientFileIdsCallable.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+
+/**
+ * IGFS client file IDs callable.
+ */
+public class IgfsClientFileIdsCallable extends IgfsClientAbstractCallable<List<IgniteUuid>> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsClientFileIdsCallable() {
+ // NO-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ * @param path Path.
+ */
+ public IgfsClientFileIdsCallable(@Nullable String igfsName, IgfsPath path) {
+ super(igfsName);
+
+ this.path = path;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected List<IgniteUuid> call0(IgfsContext ctx) throws Exception {
+ return ctx.meta().fileIds(path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary0(BinaryRawWriter writer) throws BinaryObjectException {
+ writer.writeObject(path);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException {
+ path = reader.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsClientFileIdsCallable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java
new file mode 100644
index 0000000..f799026
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientMkdirsCallable.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs.client;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryRawReader;
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.processors.igfs.IgfsContext;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * IGFS client mkdirs callable.
+ */
+public class IgfsClientMkdirsCallable extends IgfsClientAbstractCallable<Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /**
+ * Default constructor.
+ */
+ public IgfsClientMkdirsCallable() {
+ // NO-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param igfsName IGFS name.
+ * @param path Path.
+ * @param props Properties.
+ */
+ public IgfsClientMkdirsCallable(@Nullable String igfsName, IgfsPath path, @Nullable Map<String, String> props) {
+ super(igfsName);
+
+ this.path = path;
+ this.props = props;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Void call0(IgfsContext ctx) throws Exception {
+ ctx.igfs().mkdirs(path, props);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBinary0(BinaryRawWriter writer) throws BinaryObjectException {
+ writer.writeObject(path);
+
+ if (props != null) {
+ writer.writeInt(props.size());
+
+ for (Map.Entry<String, String> prop : props.entrySet()) {
+ writer.writeString(prop.getKey());
+ writer.writeString(prop.getValue());
+ }
+ }
+ else
+ writer.writeInt(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readBinary0(BinaryRawReader reader) throws BinaryObjectException {
+ path = reader.readObject();
+
+ int propsSize = reader.readInt();
+
+ if (propsSize > 0) {
+ props = new HashMap<>(propsSize, 1.0f);
+
+ for (int i = 0; i < propsSize; i++)
+ props.put(reader.readString(), reader.readString());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsClientMkdirsCallable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/79644c16/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
index 633e9d0..76a88d9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/IgniteClusterProcessProxy.java
@@ -69,6 +69,11 @@ public class IgniteClusterProcessProxy implements IgniteClusterEx {
}
/** {@inheritDoc} */
+ @Override public ClusterGroup forIgfsMetadataDataNodes(@Nullable String igfsName, @Nullable String metaCacheName) {
+ throw new UnsupportedOperationException("Operation is not supported yet.");
+ }
+
+ /** {@inheritDoc} */
@Override public ClusterNode localNode() {
return compute.call(new LocalNodeTask());
}