You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/03/18 15:07:15 UTC
[01/12] ignite git commit: IGNITE-2853 - Fixed cancellation of the
job that depends on a service
Repository: ignite
Updated Branches:
refs/heads/ignite-2813 faa897f7b -> dc71d212c
IGNITE-2853 - Fixed cancellation of the job that depends on a service
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d3420e6b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d3420e6b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d3420e6b
Branch: refs/heads/ignite-2813
Commit: d3420e6bc5e833a6eb1daaad25b11843f97328d5
Parents: 69d1f4b
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Wed Mar 16 22:21:24 2016 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Wed Mar 16 22:21:24 2016 -0700
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../ComputeJobCancelWithServiceSelfTest.java | 154 +++++++++++++++++++
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
3 files changed, 157 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5d8daf6..8df89f3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -849,6 +849,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
startProcessor(new GridCacheProcessor(ctx));
startProcessor(new GridQueryProcessor(ctx));
+ startProcessor(new GridServiceProcessor(ctx));
startProcessor(new GridTaskSessionProcessor(ctx));
startProcessor(new GridJobProcessor(ctx));
startProcessor(new GridTaskProcessor(ctx));
@@ -860,7 +861,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
startProcessor((GridProcessor)(cfg.isPeerClassLoadingEnabled() ?
IgniteComponentType.HADOOP.create(ctx, true): // No-op when peer class loading is enabled.
IgniteComponentType.HADOOP.createIfInClassPath(ctx, cfg.getHadoopConfiguration() != null)));
- startProcessor(new GridServiceProcessor(ctx));
startProcessor(new DataStructuresProcessor(ctx));
startProcessor(createComponent(PlatformProcessor.class, ctx));
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
new file mode 100644
index 0000000..2718ed9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/ComputeJobCancelWithServiceSelfTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskFuture;
+import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Test cancellation of a job that depends on service.
+ */
+public class ComputeJobCancelWithServiceSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testJobCancel() throws Exception {
+ Ignite server = startGrid("server");
+
+ server.services().deployNodeSingleton("my-service", new MyService());
+
+ Ignition.setClientMode(true);
+
+ Ignite client = startGrid("client");
+
+ IgniteCompute compute = client.compute().withAsync();
+
+ compute.execute(new MyTask(), null);
+
+ ComputeTaskFuture<Integer> fut = compute.future();
+
+ Thread.sleep(3000);
+
+ server.close();
+
+ assertEquals(42, fut.get().intValue());
+ }
+
+ /** */
+ private static class MyService implements Service {
+ /** */
+ private volatile boolean cancelled;
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ cancelled = true;
+ }
+
+ /**
+ * @return Response.
+ */
+ public int hello() {
+ assertFalse("Service already cancelled!", cancelled);
+
+ return 42;
+ }
+ }
+
+ /** */
+ private static class MyTask extends ComputeTaskSplitAdapter<Object, Integer> {
+ /** {@inheritDoc} */
+ @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) {
+ return Collections.singletonList(new ComputeJobAdapter() {
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ @Override
+ public Object execute() throws IgniteException {
+ MyService svc = ignite.services().service("my-service");
+
+ while (!isCancelled()) {
+ try {
+ Thread.sleep(1000);
+
+ svc.hello();
+ }
+ catch (InterruptedException e) {
+ // No-op.
+ }
+ }
+
+ assertTrue(isCancelled());
+
+ return svc.hello();
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public Integer reduce(List<ComputeJobResult> results) {
+ assertEquals(1, results.size());
+
+ return results.get(0).getData();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d3420e6b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 6233bab..a8d6e5c 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@ -19,6 +19,7 @@ package org.apache.ignite.testsuites;
import java.util.Set;
import junit.framework.TestSuite;
+import org.apache.ignite.internal.ComputeJobCancelWithServiceSelfTest;
import org.apache.ignite.internal.GridCommunicationSelfTest;
import org.apache.ignite.internal.GridDiscoveryEventSelfTest;
import org.apache.ignite.internal.GridDiscoverySelfTest;
@@ -114,6 +115,7 @@ public class IgniteKernalSelfTestSuite extends TestSuite {
suite.addTestSuite(IgniteUpdateNotifierPerClusterSettingSelfTest.class);
suite.addTestSuite(GridLocalEventListenerSelfTest.class);
suite.addTestSuite(IgniteTopologyPrintFormatSelfTest.class);
+ suite.addTestSuite(ComputeJobCancelWithServiceSelfTest.class);
// Managed Services.
suite.addTestSuite(GridServiceProcessorSingleNodeSelfTest.class);
[08/12] ignite git commit: Merge remote-tracking branch
'origin/master'
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/88ffed13
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/88ffed13
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/88ffed13
Branch: refs/heads/ignite-2813
Commit: 88ffed13527d43cbfa204ae90cc39722fa3a77c4
Parents: ebf4075 0b10e0c
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 16:38:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 16:38:58 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 10 ++++
.../GridCachePartitionExchangeManager.java | 3 ++
.../util/nio/GridNioRecoveryDescriptor.java | 21 +++++++-
.../ignite/internal/util/nio/GridNioServer.java | 52 +++++++++++++++++++-
.../communication/tcp/TcpCommunicationSpi.java | 8 +++
.../tcp/TcpCommunicationSpiMBean.java | 8 ++-
6 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[03/12] ignite git commit: IGNITE-2834: Implemented.
Posted by vo...@apache.org.
IGNITE-2834: Implemented.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/83048d56
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/83048d56
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/83048d56
Branch: refs/heads/ignite-2813
Commit: 83048d56857a16ca87bf0bb96ba0cf274ed11e95
Parents: d3420e6
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 17 17:02:10 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 17 17:02:10 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 47 ++++++++++++++++
.../org/apache/ignite/internal/IgnitionEx.java | 10 ++--
.../discovery/GridDiscoveryManager.java | 3 +-
.../processors/cache/GridCacheUtils.java | 20 -------
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +-
.../IgfsColocatedMetadataAffinityKeyMapper.java | 47 ++++++++++++++++
.../internal/processors/igfs/IgfsUtils.java | 56 ++++++++++++++++++++
.../visor/node/VisorNodeDataCollectorJob.java | 4 +-
8 files changed, 160 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 1a9c0fe..99d364e 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.ignite.configuration;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsMode;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
@@ -83,6 +84,9 @@ public class FileSystemConfiguration {
/** Default value of whether to initialize default path modes. */
public static final boolean DFLT_INIT_DFLT_PATH_MODES = true;
+ /** Default value of metadata co-location flag. */
+ public static boolean DFLT_COLOCATE_META = true;
+
/** IGFS instance name. */
private String name;
@@ -164,6 +168,9 @@ public class FileSystemConfiguration {
/** Whether to initialize default path modes. */
private boolean initDfltPathModes = DFLT_INIT_DFLT_PATH_MODES;
+ /** Metadata co-location flag. */
+ private boolean colocateMeta = DFLT_COLOCATE_META;
+
/**
* Constructs default configuration.
*/
@@ -184,6 +191,7 @@ public class FileSystemConfiguration {
*/
blockSize = cfg.getBlockSize();
bufSize = cfg.getStreamBufferSize();
+ colocateMeta = cfg.isColocateMetadata();
dataCacheName = cfg.getDataCacheName();
dfltMode = cfg.getDefaultMode();
dualModeMaxPendingPutsSize = cfg.getDualModeMaxPendingPutsSize();
@@ -830,6 +838,45 @@ public class FileSystemConfiguration {
this.initDfltPathModes = initDfltPathModes;
}
+ /**
+ * Get whether to co-locate metadata on a single node.
+ * <p>
+ * Normally Ignite spread ownership of particular keys among all cache nodes. Transaction with keys owned by
+ * different nodes will produce more network traffic and will require more time to complete comparing to
+ * transaction with keys owned only by a single node.
+ * <p>
+ * IGFS stores information about file system structure (metadata) inside a transactional cache configured through
+ * {@link #getMetaCacheName()} property. Metadata updates caused by operations on IGFS usually require several
+ * intearnal keys to be updated. As IGFS metadata cache usually operates in {@link CacheMode#REPLICATED} mode,
+ * meaning that all nodes have all metadata locally, it makes sense to give a hint to Ignite to co-locate
+ * ownership of all metadata keys on a single node. This will decrease amount of network trips required to update
+ * metadata and hence could improve performance.
+ * <p>
+ * This property should be disabled if you see excessive CPU and network load on a single node, which
+ * degrades performance and cannot be explained by business logic of your application.
+ * <p>
+ * This settings is only used if metadata cache is configured in {@code CacheMode#REPLICATED} mode. Otherwise it
+ * is ignored.
+ * <p>
+ * Defaults to {@link #DFLT_COLOCATE_META}.
+ *
+ * @return {@code True} if metadata co-location is enabled.
+ */
+ public boolean isColocateMetadata() {
+ return colocateMeta;
+ }
+
+ /**
+ * Set metadata co-location flag.
+ * <p>
+ * See {@link #isColocateMetadata()} for more information.
+ *
+ * @param colocateMeta Whether metadata co-location is enabled.
+ */
+ public void setColocateMetadata(boolean colocateMeta) {
+ this.colocateMeta = colocateMeta;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(FileSystemConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 4796581..f560917 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -62,6 +62,7 @@ import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -1950,14 +1951,11 @@ public class IgnitionEx {
cfg.setCacheConfiguration(cacheCfgs.toArray(new CacheConfiguration[cacheCfgs.size()]));
- // Iterate over IGFS caches and set "copyOnRead" flag to "false". Note that we do this after cloning
- // to leave user object unchanged.
+ // Iterate over IGFS caches and prepare their configuration if needed.
assert cfg.getCacheConfiguration() != null;
- for (CacheConfiguration ccfg : cfg.getCacheConfiguration()) {
- if (CU.isIgfsCache(cfg, ccfg.getName()))
- ccfg.setCopyOnRead(false);
- }
+ for (CacheConfiguration ccfg : cfg.getCacheConfiguration())
+ IgfsUtils.prepareCacheConfiguration(cfg, ccfg);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 7b795d5..7936a04 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -1622,7 +1623,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
CachePredicate pred = entry.getValue();
- if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), cacheName) &&
+ if (!CU.isSystemCache(cacheName) && !IgfsUtils.isIgfsCache(ctx.config(), cacheName) &&
pred != null && pred.cacheNode(node))
caches.put(cacheName, pred.cacheMode);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 94307ca..617d9b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -1377,26 +1377,6 @@ public class GridCacheUtils {
// }
/**
- * @param cfg Grid configuration.
- * @param cacheName Cache name.
- * @return {@code True} in this is IGFS data or meta cache.
- */
- public static boolean isIgfsCache(IgniteConfiguration cfg, @Nullable String cacheName) {
- FileSystemConfiguration[] igfsCfgs = cfg.getFileSystemConfiguration();
-
- if (igfsCfgs != null) {
- for (FileSystemConfiguration igfsCfg : igfsCfgs) {
- // IGFS config probably has not been validated yet => possible NPE, so we check for null.
- if (igfsCfg != null &&
- (F.eq(cacheName, igfsCfg.getDataCacheName()) || F.eq(cacheName, igfsCfg.getMetaCacheName())))
- return true;
- }
- }
-
- return false;
- }
-
- /**
* Convert TTL to expire time.
*
* @param ttl TTL.
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 18e2d09..0a73fe0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridMapEntry;
@@ -735,7 +736,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
assert cfg != null;
boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
- !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
+ !IgfsUtils.isIgfsCache(ctx.config(), cfg.getName());
CacheObjectContext ctx0 = super.contextForCache(cfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsColocatedMetadataAffinityKeyMapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsColocatedMetadataAffinityKeyMapper.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsColocatedMetadataAffinityKeyMapper.java
new file mode 100644
index 0000000..d8c10b0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsColocatedMetadataAffinityKeyMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.cache.affinity.AffinityKeyMapper;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Special implementation of affinity mapper which maps all metadata to the same primary.
+ */
+public class IgfsColocatedMetadataAffinityKeyMapper implements AffinityKeyMapper {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Affinity. */
+ private static final Integer AFF = 1;
+
+ /** {@inheritDoc} */
+ @Override public Object affinityKey(Object key) {
+ return AFF;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void reset() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsColocatedMetadataAffinityKeyMapper.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index edded2f..46483d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -24,9 +24,12 @@ import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsPath;
@@ -253,4 +256,57 @@ public class IgfsUtils {
if (evts.isRecordable(type))
evts.record(new IgfsEvent(path, locNode, type));
}
+
+ /**
+ * @param cfg Grid configuration.
+ * @param cacheName Cache name.
+ * @return {@code True} in this is IGFS data or meta cache.
+ */
+ public static boolean isIgfsCache(IgniteConfiguration cfg, @Nullable String cacheName) {
+ FileSystemConfiguration[] igfsCfgs = cfg.getFileSystemConfiguration();
+
+ if (igfsCfgs != null) {
+ for (FileSystemConfiguration igfsCfg : igfsCfgs) {
+ // IGFS config probably has not been validated yet => possible NPE, so we check for null.
+ if (igfsCfg != null) {
+ if (F.eq(cacheName, igfsCfg.getDataCacheName()) || F.eq(cacheName, igfsCfg.getMetaCacheName()))
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Prepare cache configuration if this is IGFS meta or data cache.
+ * @param cfg Configuration.
+ * @param ccfg Cache configuration.
+ */
+ public static void prepareCacheConfiguration(IgniteConfiguration cfg, CacheConfiguration ccfg) {
+ FileSystemConfiguration[] igfsCfgs = cfg.getFileSystemConfiguration();
+
+ if (igfsCfgs != null) {
+ for (FileSystemConfiguration igfsCfg : igfsCfgs) {
+ if (igfsCfg != null) {
+ if (F.eq(ccfg.getName(), igfsCfg.getMetaCacheName())) {
+ ccfg.setCopyOnRead(false);
+
+ // Set co-located affinity mapper if needed.
+ if (igfsCfg.isColocateMetadata() && ccfg.getCacheMode() == CacheMode.REPLICATED &&
+ ccfg.getAffinityMapper() == null)
+ ccfg.setAffinityMapper(new IgfsColocatedMetadataAffinityKeyMapper());
+
+ return;
+ }
+
+ if (F.eq(ccfg.getName(), igfsCfg.getDataCacheName())) {
+ ccfg.setCopyOnRead(false);
+
+ return;
+ }
+ }
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/83048d56/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index f996d9a..e47b013 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -24,6 +24,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.visor.igfs.VisorIgfs;
import org.apache.ignite.internal.visor.igfs.VisorIgfsEndpoint;
import org.apache.ignite.lang.IgniteProductVersion;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isIgfsCache;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
import static org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.COMPUTE_MONITORING_HOLDER_KEY;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.VISOR_TASK_EVTS;
@@ -132,7 +132,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
GridCacheProcessor cacheProc = ignite.context().cache();
for (String cacheName : cacheProc.cacheNames()) {
- if (arg.systemCaches() || !(isSystemCache(cacheName) || isIgfsCache(cfg, cacheName))) {
+ if (arg.systemCaches() || !(isSystemCache(cacheName) || IgfsUtils.isIgfsCache(cfg, cacheName))) {
long start0 = U.currentTimeMillis();
try {
[06/12] ignite git commit: IGNITE-2860: IGFS: Reworked base meta
operations.
Posted by vo...@apache.org.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index 19a91ad..26424f0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@ -17,11 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
@@ -29,7 +24,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteUuid;
@@ -38,6 +32,12 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.jetbrains.annotations.Nullable;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -142,7 +142,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEmpty(mgr.directoryListing(ROOT_ID));
assertTrue(mgr.mkdirs(new IgfsPath("/dir"), IgfsImpl.DFLT_DIR_META));
- assertNotNull(mgr.create(new IgfsPath("/file"), false, false, null, 400, null, false, null));
+ assertNotNull(mgr.create(new IgfsPath("/file"), null, false, 400, null, false, null));
IgfsListingEntry dirEntry = mgr.directoryListing(ROOT_ID).get("dir");
assertNotNull(dirEntry);
@@ -214,7 +214,7 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
private IgfsFileInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
IgfsPath p = path(path);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = mgr.create(p, false, false, null, 400, null, false, null);
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = mgr.create(p, null, false, 400, null, false, null);
assert t2 != null;
assert !t2.get1().isDirectory();
@@ -297,14 +297,13 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
assertEquals(Arrays.asList(ROOT_ID, null, null, null, null), mgr.fileIds(new IgfsPath("/f7/a/b/f6")));
// One of participated files does not exist in cache.
- expectsRenameFail("/b8", "/b2", "Failed to perform move because some path component was not found.");
+ expectsRenameFail("/b8", "/b2");
- expectsRenameFail("/a", "/b/b8", "Failed to perform move because some path component was not found.");
+ expectsRenameFail("/a", "/b/b8");
- expectsRenameFail("/a/f2", "/a/b/f3", "Failed to perform move because destination points to existing file");
+ expectsRenameFail("/a/f2", "/a/b/f3");
- expectsRenameFail("/a/k", "/a/b/", "Failed to perform move because destination already " +
- "contains entry with the same name existing file");
+ expectsRenameFail("/a/k", "/a/b/");
mgr.delete(a.id(), "k", z.id());
mgr.delete(b.id(), "k", k.id());
@@ -414,17 +413,15 @@ public class IgfsMetaManagerSelfTest extends IgfsCommonAbstractTest {
/**
* Test expected failures for 'move file' operation.
- *
- * @param msg Failure message if expected exception was not thrown.
*/
- private void expectsRenameFail(final String src, final String dst, @Nullable String msg) {
+ private void expectsRenameFail(final String src, final String dst) {
Throwable err = assertThrowsInherited(log, new Callable() {
@Override public Object call() throws Exception {
mgr.move(new IgfsPath(src), new IgfsPath(dst));
return null;
}
- }, IgfsException.class, msg);
+ }, IgfsException.class, null);
assertTrue("Unexpected cause: " + err, err instanceof IgfsException);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
index f567099..4112846 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
@@ -626,17 +626,17 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
/** @throws Exception If failed. */
public void testCreateOpenAppend() throws Exception {
// Error - path points to root directory.
- assertCreateFails("/", false, "Failed to create file (path points to an existing directory)");
+ assertCreateFails("/", false);
// Create directories.
igfs.mkdirs(path("/A/B1/C1"));
// Error - path points to directory.
for (String path : Arrays.asList("/A", "/A/B1", "/A/B1/C1")) {
- assertCreateFails(path, false, "Failed to create file (path points to an existing directory)");
- assertCreateFails(path, true, "Failed to create file (path points to an existing directory)");
- assertAppendFails(path, false, "Failed to open file (path points to an existing directory)");
- assertAppendFails(path, true, "Failed to open file (path points to an existing directory)");
+ assertCreateFails(path, false);
+ assertCreateFails(path, true);
+ assertAppendFails(path, false);
+ assertAppendFails(path, true);
assertOpenFails(path, "Failed to open file (not a file)");
}
@@ -647,13 +647,13 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
for (String path : Arrays.asList("/A/a", "/A/B1/a", "/A/B1/C1/a")) {
// Error - file doesn't exist.
assertOpenFails(path, "File not found");
- assertAppendFails(path, false, "File not found");
+ assertAppendFails(path, false);
// Create new and write.
assertEquals(text1, create(path, false, text1));
// Error - file already exists.
- assertCreateFails(path, false, "Failed to create file (file already exists and overwrite flag is false)");
+ assertCreateFails(path, false);
// Overwrite existent.
assertEquals(text2, create(path, true, text2));
@@ -669,7 +669,7 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
// Error - file doesn't exist.
assertOpenFails(path, "File not found");
- assertAppendFails(path, false, "File not found");
+ assertAppendFails(path, false);
// Create with append.
assertEquals(text1, append(path, true, text1));
@@ -927,16 +927,15 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
*
* @param path File path to create.
* @param overwrite Overwrite file if it already exists. Note: you cannot overwrite an existent directory.
- * @param msg Failure message if expected exception was not thrown.
*/
- private void assertCreateFails(final String path, final boolean overwrite, @Nullable String msg) {
+ private void assertCreateFails(final String path, final boolean overwrite) {
GridTestUtils.assertThrowsInherited(log, new Callable<Object>() {
@Override public Object call() throws Exception {
igfs.create(path(path), overwrite);
return false;
}
- }, IgfsException.class, msg);
+ }, IgfsException.class, null);
}
/**
@@ -944,16 +943,15 @@ public class IgfsProcessorSelfTest extends IgfsCommonAbstractTest {
*
* @param path File path to append.
* @param create Create file if it doesn't exist yet.
- * @param msg Failure message if expected exception was not thrown.
*/
- private void assertAppendFails(final String path, final boolean create, @Nullable String msg) {
+ private void assertAppendFails(final String path, final boolean create) {
GridTestUtils.assertThrowsInherited(log, new Callable<Object>() {
@Override public Object call() throws Exception {
igfs.append(path(path), create);
return false;
}
- }, IgfsException.class, msg);
+ }, IgfsException.class, null);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStartCacheTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStartCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStartCacheTest.java
index 56559cb..eff44f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStartCacheTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsStartCacheTest.java
@@ -17,9 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.io.BufferedWriter;
-import java.io.OutputStreamWriter;
-import java.util.concurrent.Callable;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -35,6 +32,10 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
+import java.io.BufferedWriter;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.Callable;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
@@ -85,7 +86,7 @@ public class IgfsStartCacheTest extends IgfsCommonAbstractTest {
metaCacheCfg.setName("metaCache");
metaCacheCfg.setCacheMode(REPLICATED);
metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
- dataCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ metaCacheCfg.setWriteSynchronizationMode(FULL_SYNC);
cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg);
cfg.setFileSystemConfiguration(igfsCfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
index c013cae..1dd665a 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/IgfsEventsTestSuite.java
@@ -46,11 +46,11 @@ public class IgfsEventsTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
- suite.addTest(new TestSuite(ldr.loadClass(ShmemPrivate.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(ShmemPrimary.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
@@ -66,7 +66,7 @@ public class IgfsEventsTestSuite extends TestSuite {
TestSuite suite = new TestSuite("Ignite IGFS Events Test Suite Noarch Only");
- suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
+ suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrimary.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
@@ -76,7 +76,7 @@ public class IgfsEventsTestSuite extends TestSuite {
/**
* Shared memory IPC in PRIVATE mode.
*/
- public static class ShmemPrivate extends IgfsEventsAbstractSelfTest {
+ public static class ShmemPrimary extends IgfsEventsAbstractSelfTest {
/** {@inheritDoc} */
@Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
@@ -95,7 +95,7 @@ public class IgfsEventsTestSuite extends TestSuite {
/**
* Loopback socket IPS in PRIVATE mode.
*/
- public static class LoopbackPrivate extends IgfsEventsAbstractSelfTest {
+ public static class LoopbackPrimary extends IgfsEventsAbstractSelfTest {
/** {@inheritDoc} */
@Override protected FileSystemConfiguration getIgfsConfiguration() throws IgniteCheckedException {
FileSystemConfiguration igfsCfg = super.getIgfsConfiguration();
[07/12] ignite git commit: IGNITE-2860: IGFS: Reworked base meta
operations.
Posted by vo...@apache.org.
IGNITE-2860: IGFS: Reworked base meta operations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ebf40752
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ebf40752
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ebf40752
Branch: refs/heads/ignite-2813
Commit: ebf40752854cb12c5c6202ecb8546a0090482ad6
Parents: 3e53f17
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 16:38:45 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 16:38:45 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/igfs/IgfsPath.java | 9 +
.../internal/processors/igfs/IgfsImpl.java | 59 +-
.../processors/igfs/IgfsMetaManager.java | 1058 +++++++++---------
.../internal/processors/igfs/IgfsPathIds.java | 291 +++++
.../processors/igfs/IgfsPathsCreateResult.java | 77 ++
.../internal/processors/igfs/IgfsUtils.java | 23 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 68 +-
.../igfs/IgfsMetaManagerSelfTest.java | 31 +-
.../processors/igfs/IgfsProcessorSelfTest.java | 26 +-
.../processors/igfs/IgfsStartCacheTest.java | 9 +-
.../apache/ignite/igfs/IgfsEventsTestSuite.java | 10 +-
11 files changed, 1039 insertions(+), 622 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java
index fb0621c..bbb4efb 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsPath.java
@@ -159,6 +159,15 @@ public final class IgfsPath implements Comparable<IgfsPath>, Externalizable {
}
/**
+ * Get components in array form.
+ *
+ * @return Components array.
+ */
+ public String[] componentsArray() {
+ return path.length() == 1 ? new String[0] : path.substring(1).split(SLASH);
+ }
+
+ /**
* Returns the parent of a path or {@code null} if at root.
*
* @return The parent of a path or {@code null} if at root.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 3065427..9ec583c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -17,24 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -94,6 +76,25 @@ import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_READ;
@@ -1033,8 +1034,15 @@ public final class IgfsImpl implements IgfsEx {
else
dirProps = fileProps = new HashMap<>(props);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, false/*append*/, overwrite, dirProps,
- cfg.getBlockSize(), affKey, evictExclude(path, true), fileProps);
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(
+ path,
+ dirProps,
+ overwrite,
+ cfg.getBlockSize(),
+ affKey,
+ evictExclude(path, true),
+ fileProps
+ );
assert t2 != null;
@@ -1104,8 +1112,15 @@ public final class IgfsImpl implements IgfsEx {
else
dirProps = fileProps = new HashMap<>(props);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(path, true/*append*/, false/*overwrite*/,
- dirProps, cfg.getBlockSize(), null/*affKey*/, evictExclude(path, true), fileProps);
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.append(
+ path,
+ dirProps,
+ create,
+ cfg.getBlockSize(),
+ null/*affKey*/,
+ evictExclude(path, true),
+ fileProps
+ );
assert t2 != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index b4774f2..d91b0bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -17,31 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.concurrent.CountDownLatch;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.MutableEntry;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
@@ -82,16 +57,36 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
-import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_CREATED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_RENAMED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CREATED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_RENAMED;
-import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+
import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache based structure (meta data) manager.
@@ -334,6 +329,51 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Gets all file IDs for components of specified path. Result cannot be empty - there is at least root element.
+ * But each element (except the first) can be {@code null} if such files don't exist.
+ *
+ * @param path Path.
+ * @return Collection of file IDs for components of specified path.
+ * @throws IgniteCheckedException If failed.
+ */
+ public IgfsPathIds pathIds(IgfsPath path) throws IgniteCheckedException {
+ if (busyLock.enterBusy()) {
+ try {
+ validTxState(false);
+
+ // Prepare parts.
+ String[] components = path.componentsArray();
+
+ String[] parts = new String[components.length + 1];
+
+ System.arraycopy(components, 0, parts, 1, components.length);
+
+ // Prepare IDs.
+ IgniteUuid[] ids = new IgniteUuid[parts.length];
+
+ ids[0] = IgfsUtils.ROOT_ID;
+
+ for (int i = 1; i < ids.length; i++) {
+ IgniteUuid id = fileId(ids[i - 1], parts[i], false);
+
+ if (id != null)
+ ids[i] = id;
+ else
+ break;
+ }
+
+ // Return.
+ return new IgfsPathIds(path, parts, ids);
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to get file IDS because Grid is stopping: " + path);
+ }
+
+ /**
* Gets all file IDs for components of specified path possibly skipping existing transaction. Result cannot
* be empty - there is at least root element. But each element (except the first) can be {@code null} if such
* files don't exist.
@@ -823,107 +863,84 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- // 1. First get source and destination path IDs.
- List<IgniteUuid> srcPathIds = fileIds(srcPath);
- List<IgniteUuid> dstPathIds = fileIds(dstPath);
+ // Prepare path IDs.
+ IgfsPathIds srcPathIds = pathIds(srcPath);
+ IgfsPathIds dstPathIds = pathIds(dstPath);
- final Set<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
+ // Source path must exists.
+ if (!srcPathIds.allExists())
+ throw new IgfsPathNotFoundException("Failed to perform move because source path is not " +
+ "found: " + srcPath);
- allIds.addAll(srcPathIds);
+ // At this point we need to understand name of resulting entry. It will be either destination leaf
+ // or source leaf depending on existence.
+ String dstName;
- final IgniteUuid dstLeafId = dstPathIds.get(dstPathIds.size() - 1);
+ if (dstPathIds.lastExists())
+ // Full destination path exists -> use source name.
+ dstName = srcPathIds.lastPart();
+ else {
+ if (dstPathIds.lastParentExists()) {
+ // Destination path doesn't exists -> use destination name.
+ dstName = dstPathIds.lastPart();
- if (dstLeafId == null) {
- // Delete null entry for the unexisting destination element:
- dstPathIds.remove(dstPathIds.size() - 1);
+ dstPathIds = dstPathIds.parent();
+ }
+ else
+ // Destination parent is not found either -> exception.
+ throw new IgfsPathNotFoundException("Failed to perform move because destination path is not " +
+ "found: " + dstPath.parent());
}
- allIds.addAll(dstPathIds);
+ // Lock participating IDs.
+ final Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- if (allIds.remove(null)) {
- throw new IgfsPathNotFoundException("Failed to perform move because some path component was " +
- "not found. [src=" + srcPath + ", dst=" + dstPath + ']');
- }
+ srcPathIds.addExistingIds(lockIds);
+ dstPathIds.addExistingIds(lockIds);
- // 2. Start transaction.
IgniteInternalTx tx = startTx();
try {
- // 3. Obtain the locks.
- final Map<IgniteUuid, IgfsFileInfo> allInfos = lockIds(allIds);
+ // Obtain the locks.
+ final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
- // 4. Verify integrity of source directory.
- if (!verifyPathIntegrity(srcPath, srcPathIds, allInfos)) {
+ // Verify integrity of source and destination paths.
+ if (!srcPathIds.verifyIntegrity(lockInfos))
throw new IgfsPathNotFoundException("Failed to perform move because source directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
- }
-
- // 5. Verify integrity of destination directory.
- final IgfsPath dstDirPath = dstLeafId != null ? dstPath : dstPath.parent();
- if (!verifyPathIntegrity(dstDirPath, dstPathIds, allInfos)) {
+ if (!dstPathIds.verifyIntegrity(lockInfos))
throw new IgfsPathNotFoundException("Failed to perform move because destination directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
- }
-
- // 6. Calculate source and destination targets which will be changed.
- IgniteUuid srcTargetId = srcPathIds.get(srcPathIds.size() - 2);
- IgfsFileInfo srcTargetInfo = allInfos.get(srcTargetId);
- String srcName = srcPath.name();
-
- IgniteUuid dstTargetId;
- IgfsFileInfo dstTargetInfo;
- String dstName;
- if (dstLeafId != null) {
- // Destination leaf exists. Check if it is an empty directory.
- IgfsFileInfo dstLeafInfo = allInfos.get(dstLeafId);
-
- assert dstLeafInfo != null;
-
- if (dstLeafInfo.isDirectory()) {
- // Destination is a directory.
- dstTargetId = dstLeafId;
- dstTargetInfo = dstLeafInfo;
- dstName = srcPath.name();
- }
- else {
- // Error, destination is existing file.
- throw new IgfsPathAlreadyExistsException("Failed to perform move " +
- "because destination points to " +
- "existing file [src=" + srcPath + ", dst=" + dstPath + ']');
- }
- }
- else {
- // Destination leaf doesn't exist, so we operate on parent.
- dstTargetId = dstPathIds.get(dstPathIds.size() - 1);
- dstTargetInfo = allInfos.get(dstTargetId);
- dstName = dstPath.name();
- }
+ // Addiional check: is destination directory?
+ IgfsFileInfo dstParentInfo = lockInfos.get(dstPathIds.lastId());
- assert dstTargetInfo != null;
- assert dstTargetInfo.isDirectory();
+ if (dstParentInfo.isFile())
+ throw new IgfsPathAlreadyExistsException("Failed to perform move because destination points " +
+ "to existing file [src=" + srcPath + ", dst=" + dstPath + ']');
- // 7. Last check: does destination target already have listing entry with the same name?
- if (dstTargetInfo.hasChild(dstName)) {
+ // Additional check: does destination already has child with the same name?
+ if (dstParentInfo.hasChild(dstName))
throw new IgfsPathAlreadyExistsException("Failed to perform move because destination already " +
"contains entry with the same name existing file [src=" + srcPath +
", dst=" + dstPath + ']');
- }
- // 8. Actual move: remove from source parent and add to destination target.
- IgfsListingEntry entry = srcTargetInfo.listing().get(srcName);
+ // Actual move: remove from source parent and add to destination target.
+ IgfsFileInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId());
- transferEntry(entry, srcTargetId, srcName, dstTargetId, dstName);
+ IgfsFileInfo srcInfo = lockInfos.get(srcPathIds.lastId());
+ String srcName = srcPathIds.lastPart();
+ IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcName);
- tx.commit();
+ transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName);
- IgfsPath realNewPath = new IgfsPath(dstDirPath, dstName);
+ tx.commit();
- IgfsFileInfo moved = allInfos.get(srcPathIds.get(srcPathIds.size() - 1));
+ IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
// Set the new path to the info to simplify event creation:
- return IgfsFileInfo.builder(moved).path(realNewPath).build();
+ return IgfsFileInfo.builder(srcInfo).path(newPath).build();
}
finally {
tx.close();
@@ -1117,72 +1134,57 @@ public class IgfsMetaManager extends IgfsManager {
try {
validTxState(false);
- final SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
+ IgfsPathIds pathIds = pathIds(path);
- List<IgniteUuid> pathIdList = fileIds(path);
-
- assert pathIdList.size() > 1;
+ // Continue only if the whole path present.
+ if (!pathIds.allExists())
+ return null; // A fragment of the path no longer exists.
- final IgniteUuid victimId = pathIdList.get(pathIdList.size() - 1);
+ IgniteUuid victimId = pathIds.lastId();
+ String victimName = pathIds.lastPart();
- assert !IgfsUtils.isRootOrTrashId(victimId) : "Cannot delete root or trash directories.";
+ if (IgfsUtils.isRootId(victimId))
+ throw new IgfsException("Cannot remove root directory");
- allIds.addAll(pathIdList);
+ // Prepare IDs to lock.
+ SortedSet<IgniteUuid> allIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- if (allIds.remove(null))
- return null; // A fragment of the path no longer exists.
+ pathIds.addExistingIds(allIds);
IgniteUuid trashId = IgfsUtils.randomTrashId();
- boolean added = allIds.add(trashId);
- assert added;
+ allIds.add(trashId);
- final IgniteInternalTx tx = startTx();
+ IgniteInternalTx tx = startTx();
try {
- final Map<IgniteUuid, IgfsFileInfo> infoMap = lockIds(allIds);
+ // Lock participants.
+ Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(allIds);
- // Directory starure was changed concurrently, so the original path no longer exists:
- if (!verifyPathIntegrity(path, pathIdList, infoMap))
+ // Ensure that all participants are still in place.
+ if (!pathIds.verifyIntegrity(lockInfos))
return null;
- final IgfsFileInfo victimInfo = infoMap.get(victimId);
+ IgfsFileInfo victimInfo = lockInfos.get(victimId);
+ // Cannot delete non-empty directory if recursive flag is not set.
if (!recursive && victimInfo.hasChildren())
- // Throw exception if not empty and not recursive.
throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
"empty and recursive flag is not set).");
- IgfsFileInfo destInfo = infoMap.get(trashId);
-
- assert destInfo != null;
+ // Prepare trash data.
+ IgfsFileInfo trashInfo = lockInfos.get(trashId);
+ final String trashName = victimId.toString();
- final String srcFileName = path.name();
+ assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
+ "destination directory (file already exists) [destName=" + trashName + ']';
- final String destFileName = victimId.toString();
+ IgniteUuid parentId = pathIds.lastParentId();
+ IgfsFileInfo parentInfo = lockInfos.get(parentId);
- assert !destInfo.hasChild(destFileName) : "Failed to add file name into the " +
- "destination directory (file already exists) [destName=" + destFileName + ']';
-
- IgfsFileInfo srcParentInfo = infoMap.get(pathIdList.get(pathIdList.size() - 2));
-
- assert srcParentInfo != null;
-
- IgniteUuid srcParentId = srcParentInfo.id();
- assert srcParentId.equals(pathIdList.get(pathIdList.size() - 2));
-
- IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcFileName);
-
- assert srcEntry != null : "Deletion victim not found in parent listing [path=" + path +
- ", name=" + srcFileName + ", listing=" + srcParentInfo.listing() + ']';
-
- assert victimId.equals(srcEntry.fileId());
-
- transferEntry(srcEntry, srcParentId, srcFileName, trashId, destFileName);
+ transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
if (victimInfo.isFile())
- // Update a file info of the removed file with a file path,
- // which will be used by delete worker for event notifications.
invokeUpdatePath(victimId, path);
tx.commit();
@@ -1647,74 +1649,57 @@ public class IgfsMetaManager extends IgfsManager {
*
* @param path The path to create.
* @param props The properties to use for created directories.
- * @return True iff a directory was created during the operation.
+ * @return True if a directory was created during the operation.
* @throws IgniteCheckedException If a non-directory file exists on the requested path, and in case of other errors.
*/
boolean mkdirs(final IgfsPath path, final Map<String, String> props) throws IgniteCheckedException {
- assert props != null;
validTxState(false);
- DirectoryChainBuilder b = null;
-
while (true) {
if (busyLock.enterBusy()) {
try {
- b = new DirectoryChainBuilder(path, props);
+ // Prepare path IDs.
+ IgfsPathIds pathIds = pathIds(path);
+
+ // Prepare lock IDs. Essentially, they consist of two parts: existing IDs and potential new IDs.
+ Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
+
+ pathIds.addExistingIds(lockIds);
+ pathIds.addSurrogateIds(lockIds);
+
+ assert lockIds.size() == pathIds.count();
// Start TX.
IgniteInternalTx tx = startTx();
try {
- final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
+ final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
- // If the path was changed, we close the current Tx and repeat the procedure again
- // starting from taking the path ids.
- if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) {
- // Locked path okay, trying to proceed with the remainder creation.
- IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId);
+ if (!pathIds.verifyIntegrity(lockInfos))
+ // Directory structure changed concurrently. So we simply re-try.
+ continue;
- // Check only the lowermost directory in the existing directory chain
- // because others are already checked in #verifyPathIntegrity() above.
- if (!lowermostExistingInfo.isDirectory())
+ // Check if the whole structure is already in place.
+ if (pathIds.allExists()) {
+ if (lockInfos.get(pathIds.lastExistingId()).isDirectory())
+ return false;
+ else
throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
"element is not a directory)");
+ }
- if (b.existingIdCnt == b.components.size() + 1) {
- assert b.existingPath.equals(path);
- assert lockedInfos.size() == b.existingIdCnt;
-
- // The target directory already exists, nothing to do.
- // (The fact that all the path consisns of directories is already checked above).
- // Note that properties are not updated in this case.
- return false;
- }
-
- Map<String, IgfsListingEntry> parentListing = lowermostExistingInfo.listing();
-
- String shortName = b.components.get(b.existingIdCnt - 1);
-
- IgfsListingEntry entry = parentListing.get(shortName);
+ IgfsPathsCreateResult res = createDirectory(pathIds, lockInfos, props);
- if (entry == null) {
- b.doBuild();
+ if (res == null)
+ continue;
- tx.commit();
+ // Commit TX.
+ tx.commit();
- break;
- }
- else {
- // Another thread created file or directory with the same name.
- if (!entry.isDirectory()) {
- // Entry exists, and it is not a directory:
- throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
- "element is not a directory)");
- }
+ generateCreateEvents(res.createdPaths(), false);
- // If this is a directory, we continue the repeat loop,
- // because we cannot lock this directory without
- // lock ordering rule violation.
- }
- }
+ // We are done.
+ return true;
}
finally {
tx.close();
@@ -1727,12 +1712,6 @@ public class IgfsMetaManager extends IgfsManager {
else
throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
}
-
- assert b != null;
-
- b.sendEvents();
-
- return true;
}
/**
@@ -1960,11 +1939,12 @@ public class IgfsMetaManager extends IgfsManager {
parentInfo = synchronize(fs, parentPath, parentInfo, parent0, true, null);
// Fire notification about missing directories creation.
- if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
IgfsPath evtPath = parent0;
while (!parentPath.equals(evtPath)) {
- pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EVT_IGFS_DIR_CREATED));
+ pendingEvts.addFirst(new IgfsEvent(evtPath, locNode,
+ EventType.EVT_IGFS_DIR_CREATED));
evtPath = evtPath.parent();
@@ -2010,8 +1990,8 @@ public class IgfsMetaManager extends IgfsManager {
}
// Record CREATE event if needed.
- if (evts.isRecordable(EVT_IGFS_FILE_CREATED))
- pendingEvts.add(new IgfsEvent(path, locNode, EVT_IGFS_FILE_CREATED));
+ if (evts.isRecordable(EventType.EVT_IGFS_FILE_CREATED))
+ pendingEvts.add(new IgfsEvent(path, locNode, EventType.EVT_IGFS_FILE_CREATED));
return new IgfsSecondaryOutputStreamDescriptor(parentInfo.id(), newInfo, out);
}
@@ -2285,11 +2265,11 @@ public class IgfsMetaManager extends IgfsManager {
synchronize(fs, parentPath, parentPathInfo, path, true, null);
- if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
IgfsPath evtPath = path;
while (!parentPath.equals(evtPath)) {
- pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EVT_IGFS_DIR_CREATED));
+ pendingEvts.addFirst(new IgfsEvent(evtPath, locNode, EventType.EVT_IGFS_DIR_CREATED));
evtPath = evtPath.parent();
@@ -2386,15 +2366,15 @@ public class IgfsMetaManager extends IgfsManager {
// Record event if needed.
if (srcInfo.isFile()) {
- if (evts.isRecordable(EVT_IGFS_FILE_RENAMED))
+ if (evts.isRecordable(EventType.EVT_IGFS_FILE_RENAMED))
pendingEvts.add(new IgfsEvent(
src,
destInfo == null ? dest : new IgfsPath(dest, src.name()),
locNode,
- EVT_IGFS_FILE_RENAMED));
+ EventType.EVT_IGFS_FILE_RENAMED));
}
- else if (evts.isRecordable(EVT_IGFS_DIR_RENAMED))
- pendingEvts.add(new IgfsEvent(src, dest, locNode, EVT_IGFS_DIR_RENAMED));
+ else if (evts.isRecordable(EventType.EVT_IGFS_DIR_RENAMED))
+ pendingEvts.add(new IgfsEvent(src, dest, locNode, EventType.EVT_IGFS_DIR_RENAMED));
return true;
}
@@ -2896,7 +2876,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Transaction.
*/
private IgniteInternalTx startTx() {
- return metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ return metaCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
}
/**
@@ -3178,6 +3158,14 @@ public class IgfsMetaManager extends IgfsManager {
private IgfsListingEntry entry;
/**
+ * Empty constructor required for {@link Externalizable}.
+ *
+ */
+ public ListingAddProcessor() {
+ // No-op.
+ }
+
+ /**
* Constructs update directory listing closure.
*
* @param fileName File name to add into parent listing.
@@ -3191,19 +3179,10 @@ public class IgfsMetaManager extends IgfsManager {
this.entry = entry;
}
- /**
- * Empty constructor required for {@link Externalizable}.
- *
- */
- public ListingAddProcessor() {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args) {
IgfsFileInfo fileInfo = e.getValue();
- assert fileInfo != null : "File info not found for the child: " + entry.fileId();
assert fileInfo.isDirectory();
Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
@@ -3240,6 +3219,73 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Listing replace processor.
+ */
+ private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Name. */
+ private String name;
+
+ /** New ID. */
+ private IgniteUuid id;
+
+ /**
+ * Constructor.
+ */
+ public ListingReplaceProcessor() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ * @param id ID.
+ */
+ public ListingReplaceProcessor(String name, IgniteUuid id) {
+ this.name = name;
+ this.id = id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args)
+ throws EntryProcessorException {
+ IgfsFileInfo fileInfo = e.getValue();
+
+ assert fileInfo.isDirectory();
+
+ Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
+
+ // Modify listing in-place.
+ IgfsListingEntry oldEntry = listing.get(name);
+
+ if (oldEntry == null)
+ throw new IgniteException("Directory listing doesn't contain expected entry: " + name);
+
+ listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory()));
+
+ e.setValue(new IgfsFileInfo(listing, fileInfo));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ U.writeString(out, name);
+ out.writeObject(id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ name = U.readString(in);
+ id = (IgniteUuid)in.readObject();
+ }
+ }
+
+ /**
* Update path closure.
*/
@GridInternal
@@ -3291,170 +3337,92 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Create a new file.
+ * Append routine.
*
* @param path Path.
- * @param bufSize Buffer size.
- * @param overwrite Overwrite flag.
+ * @param dirProps Directory properties.
+ * @param create Create flag.
+ * @param blockSize Block size.
* @param affKey Affinity key.
- * @param replication Replication factor.
- * @param props Properties.
- * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
- * @return Tuple containing the created file info and its parent id.
+ * @param evictExclude Evict exclude flag.
+ * @param fileProps File properties.
+ * @return Tuple containing the file info and its parent id.
+ * @throws IgniteCheckedException If failed.
*/
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> create(
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> append(
final IgfsPath path,
- final boolean append,
- final boolean overwrite,
Map<String, String> dirProps,
+ final boolean create,
final int blockSize,
final @Nullable IgniteUuid affKey,
final boolean evictExclude,
@Nullable Map<String, String> fileProps) throws IgniteCheckedException {
validTxState(false);
- assert path != null;
-
- final String name = path.name();
-
- DirectoryChainBuilder b = null;
-
- IgniteUuid trashId = IgfsUtils.randomTrashId();
-
while (true) {
if (busyLock.enterBusy()) {
try {
- b = new DirectoryChainBuilder(path, dirProps, fileProps, blockSize, affKey, evictExclude);
-
- // Start Tx:
- IgniteInternalTx tx = startTx();
-
- try {
- if (overwrite)
- // Lock also the TRASH directory because in case of overwrite we
- // may need to delete the old file:
- b.idSet.add(trashId);
-
- final Map<IgniteUuid, IgfsFileInfo> lockedInfos = lockIds(b.idSet);
-
- assert !overwrite || lockedInfos.get(trashId) != null; // TRASH must exist at this point.
-
- // If the path was changed, we close the current Tx and repeat the procedure again
- // starting from taking the path ids.
- if (verifyPathIntegrity(b.existingPath, b.idList, lockedInfos)) {
- // Locked path okay, trying to proceed with the remainder creation.
- final IgfsFileInfo lowermostExistingInfo = lockedInfos.get(b.lowermostExistingId);
+ // Prepare path IDs.
+ IgfsPathIds pathIds = pathIds(path);
- if (b.existingIdCnt == b.components.size() + 1) {
- // Full requestd path exists.
+ // Fail-fast: create flag is not specified and some paths are missing.
+ if (!pathIds.allExists() && !create)
+ throw new IgfsPathNotFoundException("Failed to append because file is not found: " + path);
- assert b.existingPath.equals(path);
- assert lockedInfos.size() ==
- (overwrite ? b.existingIdCnt + 1/*TRASH*/ : b.existingIdCnt);
-
- if (lowermostExistingInfo.isDirectory()) {
- throw new IgfsPathAlreadyExistsException("Failed to "
- + (append ? "open" : "create") + " file (path points to an " +
- "existing directory): " + path);
- }
- else {
- // This is a file.
- assert lowermostExistingInfo.isFile();
-
- final IgniteUuid parentId = b.idList.get(b.idList.size() - 2);
-
- final IgniteUuid lockId = lowermostExistingInfo.lockId();
-
- if (append) {
- if (lockId != null)
- throw fsException("Failed to open file (file is opened for writing) "
- + "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
- + ", lockId=" + lockId + ']');
-
- IgfsFileInfo lockedInfo = invokeLock(lowermostExistingInfo.id(), false);
-
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(lockedInfo, parentId);
-
- tx.commit();
-
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path,
- EventType.EVT_IGFS_FILE_OPENED_WRITE);
-
- return t2;
- }
- else if (overwrite) {
- // Delete existing file, but fail if it is locked:
- if (lockId != null)
- throw fsException("Failed to overwrite file (file is opened for writing) " +
- "[fileName=" + name + ", fileId=" + lowermostExistingInfo.id()
- + ", lockId=" + lockId + ']');
+ // Prepare lock IDs.
+ Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- final IgfsListingEntry deletedEntry = lockedInfos.get(parentId).listing()
- .get(name);
+ pathIds.addExistingIds(lockIds);
+ pathIds.addSurrogateIds(lockIds);
- assert deletedEntry != null;
-
- transferEntry(deletedEntry, parentId, name, trashId,
- lowermostExistingInfo.id().toString());
-
- // Update a file info of the removed file with a file path,
- // which will be used by delete worker for event notifications.
- invokeUpdatePath(lowermostExistingInfo.id(), path);
-
- // Make a new locked info:
- long t = System.currentTimeMillis();
-
- final IgfsFileInfo newFileInfo = new IgfsFileInfo(cfg.getBlockSize(), 0L,
- affKey, createFileLockId(false), evictExclude, fileProps, t, t);
-
- assert newFileInfo.lockId() != null; // locked info should be created.
-
- createNewEntry(newFileInfo, parentId, name);
-
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newFileInfo, parentId);
+ // Start TX.
+ IgniteInternalTx tx = startTx();
- tx.commit();
+ try {
+ Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
- delWorker.signal();
+ if (!pathIds.verifyIntegrity(lockInfos))
+ // Directory structure changed concurrently. So we simply re-try.
+ continue;
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
+ if (pathIds.allExists()) {
+ // All participants are found. Simply open the stream.
+ IgfsFileInfo info = lockInfos.get(pathIds.lastId());
- return t2;
- }
- else {
- throw new IgfsPathAlreadyExistsException("Failed to create file (file " +
- "already exists and overwrite flag is false): " + path);
- }
- }
- }
+ // Check: is it a file?
+ if (!info.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to open file for write." + path);
- // The full requested path does not exist.
+ // Check if file already opened for write.
+ if (info.lockId() != null)
+ throw new IgfsException("File is already opened for write: " + path);
- // Check only the lowermost directory in the existing directory chain
- // because others are already checked in #verifyPathIntegrity() above.
- if (!lowermostExistingInfo.isDirectory())
- throw new IgfsParentNotDirectoryException("Failed to " + (append ? "open" : "create" )
- + " file (parent element is not a directory)");
+ // At this point we can open the stream safely.
+ info = invokeLock(info.id(), false);
- final String uppermostFileToBeCreatedName = b.components.get(b.existingIdCnt - 1);
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(info, pathIds.lastParentId());
- if (!lowermostExistingInfo.hasChild(uppermostFileToBeCreatedName)) {
- b.doBuild();
+ tx.commit();
- assert b.leafInfo != null;
- assert b.leafParentId != null;
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(b.leafInfo, b.leafParentId);
+ return t2;
+ }
+ else {
+ // Create file and parent folders.
+ IgfsPathsCreateResult res =
+ createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
- tx.commit();
+ if (res == null)
+ continue;
- b.sendEvents();
+ // Commit.
+ tx.commit();
- return t2;
- }
+ // Generate events.
+ generateCreateEvents(res.createdPaths(), true);
- // Another thread concurrently created file or directory in the path with
- // the name we need.
+ return new T2<>(res.info(), res.parentId());
}
}
finally {
@@ -3464,224 +3432,292 @@ public class IgfsMetaManager extends IgfsManager {
finally {
busyLock.leaveBusy();
}
- } else
- throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
+ }
+ else
+ throw new IllegalStateException("Failed to append for file because Grid is stopping:" + path);
}
}
- /** File chain builder. */
- private class DirectoryChainBuilder {
- /** The requested path to be created. */
- private final IgfsPath path;
+ /**
+ * Create a new file.
+ *
+ * @param path Path.
+ * @param bufSize Buffer size.
+ * @param overwrite Overwrite flag.
+ * @param affKey Affinity key.
+ * @param replication Replication factor.
+ * @param props Properties.
+ * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
+ * @return Tuple containing the created file info and its parent id.
+ */
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> create(
+ final IgfsPath path,
+ Map<String, String> dirProps,
+ final boolean overwrite,
+ final int blockSize,
+ final @Nullable IgniteUuid affKey,
+ final boolean evictExclude,
+ @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+ validTxState(false);
- /** Full path components. */
- private final List<String> components;
+ while (true) {
+ if (busyLock.enterBusy()) {
+ try {
+ // Prepare path IDs.
+ IgfsPathIds pathIds = pathIds(path);
- /** The list of ids. */
- private final List<IgniteUuid> idList;
+ // Prepare lock IDs.
+ Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- /** The set of ids. */
- private final SortedSet<IgniteUuid> idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR);
+ pathIds.addExistingIds(lockIds);
+ pathIds.addSurrogateIds(lockIds);
- /** The middle node properties. */
- private final Map<String, String> props;
+ // In overwrite mode we also lock ID of potential replacement as well as trash ID.
+ IgniteUuid overwriteId = IgniteUuid.randomUuid();
+ IgniteUuid trashId = IgfsUtils.randomTrashId();
- /** The leaf node properties. */
- private final Map<String, String> leafProps;
+ if (overwrite) {
+ lockIds.add(overwriteId);
- /** The lowermost exsiting path id. */
- private final IgniteUuid lowermostExistingId;
+ // Trash ID is only added if we suspect conflict.
+ if (pathIds.allExists())
+ lockIds.add(trashId);
+ }
- /** The existing path. */
- private final IgfsPath existingPath;
+ // Start TX.
+ IgniteInternalTx tx = startTx();
- /** The created leaf info. */
- private IgfsFileInfo leafInfo;
+ try {
+ Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
- /** The leaf parent id. */
- private IgniteUuid leafParentId;
+ if (!pathIds.verifyIntegrity(lockInfos))
+ // Directory structure changed concurrently. So we simply re-try.
+ continue;
- /** The number of existing ids. */
- private final int existingIdCnt;
+ if (pathIds.allExists()) {
+ // All participants found.
+ IgfsFileInfo oldInfo = lockInfos.get(pathIds.lastId());
- /** Whether laef is directory. */
- private final boolean leafDir;
+ // Check: is it a file?
+ if (!oldInfo.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to create a file: " + path);
- /** Block size. */
- private final int blockSize;
+ // Check: can we overwrite it?
+ if (!overwrite)
+ throw new IgfsPathAlreadyExistsException("Failed to create a file: " + path);
- /** Affinity key. */
- private final IgniteUuid affKey;
+ // Check if file already opened for write.
+ if (oldInfo.lockId() != null)
+ throw new IgfsException("File is already opened for write: " + path);
- /** Evict exclude flag. */
- private final boolean evictExclude;
+ // At this point file can be re-created safely.
- /**
- * Constructor for directories.
- *
- * @param path Path.
- * @param props Properties.
- * @throws IgniteCheckedException If failed.
- */
- protected DirectoryChainBuilder(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- this(path, props, props, true, 0, null, false);
- }
+ // First step: add existing to trash listing.
+ IgniteUuid oldId = pathIds.lastId();
- /**
- * Constructor for files.
- *
- * @param path Path.
- * @param dirProps Directory properties.
- * @param fileProps File properties.
- * @param blockSize Block size.
- * @param affKey Affinity key (optional).
- * @param evictExclude Evict exclude flag.
- * @throws IgniteCheckedException If failed.
- */
- protected DirectoryChainBuilder(IgfsPath path, Map<String, String> dirProps, Map<String, String> fileProps,
- int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude)
- throws IgniteCheckedException {
- this(path, dirProps, fileProps, false, blockSize, affKey, evictExclude);
- }
+ id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(),
+ new IgfsListingEntry(oldId, true)));
- /**
- * Constructor.
- *
- * @param path Path.
- * @param props Middle properties.
- * @param leafProps Leaf properties.
- * @param leafDir Whether leaf is directory or file.
- * @param blockSize Block size.
- * @param affKey Affinity key (optional).
- * @param evictExclude Evict exclude flag.
- * @throws IgniteCheckedException If failed.
- */
- private DirectoryChainBuilder(IgfsPath path, Map<String,String> props, Map<String,String> leafProps,
- boolean leafDir, int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude)
- throws IgniteCheckedException {
- this.path = path;
- this.components = path.components();
- this.idList = fileIds(path);
- this.props = props;
- this.leafProps = leafProps;
- this.leafDir = leafDir;
- this.blockSize = blockSize;
- this.affKey = affKey;
- this.evictExclude = evictExclude;
+ // Second step: replace ID in parent directory.
+ String name = pathIds.lastPart();
+ IgniteUuid parentId = pathIds.lastParentId();
- // Store all the non-null ids in the set & construct existing path in one loop:
- IgfsPath existingPath = path.root();
+ id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name, overwriteId));
- assert idList.size() == components.size() + 1;
+ // Third step: create the file.
+ long createTime = System.currentTimeMillis();
- // Find the lowermost existing id:
- IgniteUuid lowermostExistingId = null;
+ IgfsFileInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
+ fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
- int idIdx = 0;
+ // Fourth step: update path of remove file.
+ invokeUpdatePath(oldId, path);
- for (IgniteUuid id : idList) {
- if (id == null)
- break;
+ // Prepare result and commit.
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId);
- lowermostExistingId = id;
+ tx.commit();
- boolean added = idSet.add(id);
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
- assert added : "Not added id = " + id;
+ return t2;
+ }
+ else {
+ // Create file and parent folders.
+ IgfsPathsCreateResult res =
+ createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
- if (idIdx >= 1) // skip root.
- existingPath = new IgfsPath(existingPath, components.get(idIdx - 1));
+ if (res == null)
+ continue;
- idIdx++;
- }
+ // Commit.
+ tx.commit();
- assert idSet.contains(IgfsUtils.ROOT_ID);
+ // Generate events.
+ generateCreateEvents(res.createdPaths(), true);
- this.lowermostExistingId = lowermostExistingId;
+ return new T2<>(res.info(), res.parentId());
+ }
+ }
+ finally {
+ tx.close();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+ else
+ throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
+ }
+ }
- this.existingPath = existingPath;
+ /**
+ * Create directory and it's parents.
+ *
+ * @param pathIds Path IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @return Result or {@code} if the first parent already contained child with the same name.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ Map<String, String> dirProps) throws IgniteCheckedException {
+ // Check if entry we are going to write to is directory.
+ if (lockInfos.get(pathIds.lastExistingId()).isFile())
+ throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
+ "element is not a directory)");
+
+ return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false);
+ }
- this.existingIdCnt = idSet.size();
- }
+ /**
+ * Create file and all it's parents.
+ *
+ * @param pathIds Paths IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @param fileProps File propertris.
+ * @param blockSize Block size.
+ * @param affKey Affinity key (optional)
+ * @param evictExclude Evict exclude flag.
+ * @return Result or {@code} if the first parent already contained child with the same name.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
+ boolean evictExclude) throws IgniteCheckedException{
+ // Check if entry we are going to write to is directory.
+ if (lockInfos.get(pathIds.lastExistingId()).isFile())
+ throw new IgfsParentNotDirectoryException("Failed to open file for write " +
+ "(parent element is not a directory): " + pathIds.path());
+
+ return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+ }
- /**
- * Does the main portion of job building the renmaining path.
- */
- public final void doBuild() throws IgniteCheckedException {
- // Fix current time. It will be used in all created entities.
- long createTime = System.currentTimeMillis();
+ /**
+ * Ceate file or directory.
+ *
+ * @param dir Directory flag.
+ * @param pathIds Path IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @param fileProps File properties.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param evictExclude Evict exclude flag.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
+ Map<IgniteUuid, IgfsFileInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
+ int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException {
+ // This is our starting point.
+ int lastExistingIdx = pathIds.lastExistingIndex();
+ IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
+
+ // If current info already contains entry with the same name as it's child, then something
+ // has changed concurrently. We must re-try because we cannot get info of this unexpected
+ // element due to possible deadlocks.
+ int curIdx = lastExistingIdx + 1;
+
+ String curPart = pathIds.part(curIdx);
+ IgniteUuid curId = pathIds.surrogateId(curIdx);
+ IgniteUuid curParentId = lastExistingInfo.id();
+
+ if (lastExistingInfo.hasChild(curPart))
+ return null;
- IgfsListingEntry childInfo = null;
- String childName = null;
+ // First step: add new entry to the last existing element.
+ id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart,
+ new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
- IgniteUuid parentId = null;
+ // Events support.
+ IgfsPath lastCreatedPath = pathIds.lastExistingPath();
- // This loop creates the missing directory chain from the bottom to the top:
- for (int i = components.size() - 1; i >= existingIdCnt - 1; i--) {
- IgniteUuid childId = IgniteUuid.randomUuid();
- boolean childDir;
+ List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx);
- if (childName == null) {
- assert childInfo == null;
+ // Second step: create middle directories.
+ long createTime = System.currentTimeMillis();
- if (leafDir) {
- childDir = true;
+ while (curIdx < pathIds.count() - 1) {
+ int nextIdx = curIdx + 1;
- leafInfo = invokeAndGet(childId, new DirectoryCreateProcessor(createTime, leafProps));
- }
- else {
- childDir = false;
+ String nextPart = pathIds.part(nextIdx);
+ IgniteUuid nextId = pathIds.surrogateId(nextIdx);
- leafInfo = invokeAndGet(childId, new FileCreateProcessor(createTime, leafProps, blockSize,
- affKey, createFileLockId(false), evictExclude));
- }
- }
- else {
- assert childInfo != null;
+ id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime, dirProps,
+ nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
- childDir = true;
+ // Save event.
+ lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
- id2InfoPrj.invoke(childId, new DirectoryCreateProcessor(createTime, props, childName, childInfo));
+ createdPaths.add(lastCreatedPath);
- if (parentId == null)
- parentId = childId;
- }
+ // Advance things further.
+ curIdx++;
- childInfo = new IgfsListingEntry(childId, childDir);
+ curParentId = curId;
- childName = components.get(i);
- }
+ curPart = nextPart;
+ curId = nextId;
+ }
- if (parentId == null)
- parentId = lowermostExistingId;
+ // Third step: create leaf.
+ IgfsFileInfo info;
- leafParentId = parentId;
+ if (dir)
+ info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps));
+ else
+ info = invokeAndGet(curId, new FileCreateProcessor(createTime, fileProps,
+ blockSize, affKey, createFileLockId(false), evictExclude));
- // Now link the newly created directory chain to the lowermost existing parent:
- id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo));
- }
+ createdPaths.add(pathIds.path());
- /**
- * Sends events.
- */
- public final void sendEvents() {
- if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
- IgfsPath createdPath = existingPath;
+ return new IgfsPathsCreateResult(createdPaths, info, curParentId);
+ }
- for (int i = existingPath.components().size(); i < components.size() - 1; i++) {
- createdPath = new IgfsPath(createdPath, components.get(i));
+ /**
+ * Generate events for created file or directory.
+ *
+ * @param createdPaths Created paths.
+ * @param file Whether file was created.
+ */
+ private void generateCreateEvents(List<IgfsPath> createdPaths, boolean file) {
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+ for (int i = 0; i < createdPaths.size() - 1; i++)
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i),
+ EventType.EVT_IGFS_DIR_CREATED);
+ }
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath, EVT_IGFS_DIR_CREATED);
- }
- }
+ IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1);
- if (leafDir)
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_DIR_CREATED);
- else {
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED);
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
- }
+ if (file) {
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED);
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE);
}
+ else
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
new file mode 100644
index 0000000..1f669b0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Path IDs abstraction. Contains path and corresponding IDs.
+ */
+public class IgfsPathIds {
+ /** Original path. */
+ private final IgfsPath path;
+
+ /** Path parts. */
+ private final String[] parts;
+
+ /** IDs. */
+ private final IgniteUuid[] ids;
+
+ /** Surrogate IDs for paths which doesn't exist yet. Initialized on demand. */
+ private IgniteUuid[] surrogateIds;
+
+ /** Index of last existing ID. */
+ private final int lastExistingIdx;
+
+ /**
+ * Constructor.
+ *
+ * @param path Path.
+ * @param parts Path parts.
+ * @param ids IDs.
+ */
+ public IgfsPathIds(IgfsPath path, String[] parts, IgniteUuid[] ids) {
+ assert path != null;
+ assert parts.length == ids.length;
+
+ this.path = path;
+ this.parts = parts;
+ this.ids = ids;
+
+ int lastExistingIdx0 = -1;
+
+ for (int i = parts.length - 1; i >= 0; i--) {
+ if (ids[i] != null) {
+ lastExistingIdx0 = i;
+
+ break;
+ }
+ }
+
+ assert lastExistingIdx0 >= 0;
+
+ lastExistingIdx = lastExistingIdx0;
+ }
+
+ /**
+ * Get parent entity.
+ *
+ * @return Parent entity.
+ */
+ public IgfsPathIds parent() {
+ assert ids.length > 1;
+
+ String[] parentParts = new String[parts.length - 1];
+ IgniteUuid[] parentIds = new IgniteUuid[ids.length - 1];
+
+ System.arraycopy(parts, 0, parentParts, 0, parentParts.length);
+ System.arraycopy(ids, 0, parentIds, 0, parentIds.length);
+
+ return new IgfsPathIds(path.parent(), parentParts, parentIds);
+ }
+
+ /**
+ * Get number of elements.
+ *
+ * @return ID count.
+ */
+ public int count() {
+ return ids.length;
+ }
+
+ /**
+ * Get original path.
+ *
+ * @return Path.
+ */
+ public IgfsPath path() {
+ return path;
+ }
+
+ /**
+ * Get path part at the given index.
+ *
+ * @param idx Index.
+ * @return Path part.
+ */
+ public String part(int idx) {
+ assert idx < parts.length;
+
+ return parts[idx];
+ }
+
+ /**
+ * Get last part of original path.
+ *
+ * @return Last part.
+ */
+ public String lastPart() {
+ return parts[parts.length - 1];
+ }
+
+ /**
+ * Get last ID.
+ *
+ * @return Last ID.
+ */
+ @Nullable public IgniteUuid lastId() {
+ return ids[ids.length - 1];
+ }
+
+ /**
+ * Get last parent ID.
+ *
+ * @return Last parent ID.
+ */
+ @Nullable public IgniteUuid lastParentId() {
+ return ids[ids.length - 2];
+ }
+
+ /**
+ * Whether provided index denotes last entry in the path.
+ *
+ * @param idx Index.
+ * @return {@code True} if last.
+ */
+ public boolean isLastIndex(int idx) {
+ return idx == parts.length - 1;
+ }
+
+ /**
+ * Get path of the last existing element.
+ *
+ * @return Path of the last existing element.
+ */
+ public IgfsPath lastExistingPath() {
+ IgfsPath path = new IgfsPath();
+
+ for (int i = 1; i <= lastExistingIdx; i++)
+ path = new IgfsPath(path, parts[i]);
+
+ return path;
+ }
+
+ /**
+ * Whether all parts exists.
+ *
+ * @return {@code True} if all parts were found.
+ */
+ public boolean allExists() {
+ return parts.length == lastExistingIdx + 1;
+ }
+
+ /**
+ * Whether last entry exists.
+ *
+ * @return {@code True} if exists.
+ */
+ public boolean lastExists() {
+ return lastExistingIdx == ids.length - 1;
+ }
+
+
+ /**
+ * Whether parent of the last entry exists.
+ *
+ * @return {@code True} if exists.
+ */
+ public boolean lastParentExists() {
+ return ids.length > 1 && lastExistingIdx == ids.length - 2;
+ }
+
+ /**
+ * Get ID of the last existing entry.
+ *
+ * @return ID of the last existing entry.
+ */
+ public IgniteUuid lastExistingId() {
+ return ids[lastExistingIdx];
+ }
+
+ /**
+ * Get index of the last existing entry.
+ *
+ * @return Index of the last existing entry.
+ */
+ public int lastExistingIndex() {
+ return lastExistingIdx;
+ }
+
+ /**
+ * Add existing IDs to provided collection.
+ *
+ * @param col Collection.
+ */
+ @SuppressWarnings("ManualArrayToCollectionCopy")
+ public void addExistingIds(Collection<IgniteUuid> col) {
+ for (int i = 0; i <= lastExistingIdx; i++)
+ col.add(ids[i]);
+ }
+
+ /**
+ * Add surrogate IDs to provided collection potentially creating them on demand.
+ *
+ * @param col Collection.
+ */
+ @SuppressWarnings("ManualArrayToCollectionCopy")
+ public void addSurrogateIds(Collection<IgniteUuid> col) {
+ if (surrogateIds == null) {
+ surrogateIds = new IgniteUuid[ids.length];
+
+ for (int i = lastExistingIdx + 1; i < surrogateIds.length; i++)
+ surrogateIds[i] = IgniteUuid.randomUuid();
+ }
+
+ for (int i = lastExistingIdx + 1; i < surrogateIds.length; i++)
+ col.add(surrogateIds[i]);
+ }
+
+ /**
+ * Get surrogate ID at the given index.
+ *
+ * @param idx Index.
+ * @return Surrogate ID.
+ */
+ public IgniteUuid surrogateId(int idx) {
+ assert surrogateIds != null;
+
+ assert idx > lastExistingIdx;
+ assert idx < surrogateIds.length;
+
+ return surrogateIds[idx];
+ }
+
+ /**
+ * Verify that observed paths are found in provided infos in the right order.
+ *
+ * @param infos Info.
+ * @return {@code True} if full integrity is preserved.
+ */
+ public boolean verifyIntegrity(Map<IgniteUuid, IgfsFileInfo> infos) {
+ for (int i = 0; i <= lastExistingIdx; i++) {
+ IgniteUuid curId = ids[i];
+ IgfsFileInfo curInfo = infos.get(curId);
+
+ // Check if required ID is there.
+ if (curInfo == null)
+ return false;
+
+ // For non-leaf entry we check if child exists.
+ if (i < lastExistingIdx) {
+ String childName = parts[i + 1];
+ IgniteUuid childId = ids[i + 1];
+
+ if (!curInfo.hasChild(childName, childId))
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
new file mode 100644
index 0000000..3b620f8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+
+import java.util.List;
+
+/**
+ * IGFS paths create result.
+ */
+public class IgfsPathsCreateResult {
+ /** Created paths. */
+ private final List<IgfsPath> paths;
+
+ /** Info of the last created file. */
+ private final IgfsFileInfo info;
+
+ /** Parent ID. */
+ private final IgniteUuid parentId;
+
+ /**
+ * Constructor.
+ *
+ * @param paths Created paths.
+ * @param info Info of the last created file.
+ * @param parentId Parent ID.
+ */
+ public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsFileInfo info, IgniteUuid parentId) {
+ this.paths = paths;
+ this.info = info;
+ this.parentId = parentId;
+ }
+
+ /**
+ * @return Created paths.
+ */
+ public List<IgfsPath> createdPaths() {
+ return paths;
+ }
+
+ /**
+ * @return Info of the last created file.
+ */
+ public IgfsFileInfo info() {
+ return info;
+ }
+
+ /**
+ * @return Parent ID.
+ */
+ public IgniteUuid parentId() {
+ return parentId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsPathsCreateResult.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index edded2f..b176e21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -17,10 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.lang.reflect.Constructor;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
@@ -43,6 +39,10 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
+import java.lang.reflect.Constructor;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -104,7 +104,17 @@ public class IgfsUtils {
* @return {@code True} if this is root ID or trash ID.
*/
public static boolean isRootOrTrashId(@Nullable IgniteUuid id) {
- return id != null && (ROOT_ID.equals(id) || isTrashId(id));
+ return isRootId(id) || isTrashId(id);
+ }
+
+ /**
+ * Check whether provided ID is root ID.
+ *
+ * @param id ID.
+ * @return {@code True} if this is root ID.
+ */
+ public static boolean isRootId(@Nullable IgniteUuid id) {
+ return id != null && ROOT_ID.equals(id);
}
/**
@@ -114,7 +124,8 @@ public class IgfsUtils {
* @return {@code True} if this is trash ID.
*/
private static boolean isTrashId(IgniteUuid id) {
- assert id != null;
+ if (id == null)
+ return false;
UUID gid = id.globalId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ebf40752/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 52d8bd5..2acf59c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -17,27 +17,6 @@
package org.apache.ignite.internal.processors.igfs;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Field;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -79,6 +58,28 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -1449,19 +1450,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
createCtr.incrementAndGet();
}
catch (IgniteException e) {
- Throwable[] chain = X.getThrowables(e);
-
- Throwable cause = chain[chain.length - 1];
-
- if (!e.getMessage().startsWith("Failed to overwrite file (file is opened for writing)")
- && (cause == null
- || !cause.getMessage().startsWith("Failed to overwrite file (file is opened for writing)"))) {
-
- System.out.println("Failed due to IgniteException exception. Cause:");
- cause.printStackTrace(System.out);
-
- err.compareAndSet(null, e);
- }
+ // No-op.
}
catch (IOException e) {
err.compareAndSet(null, e);
@@ -1937,15 +1926,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
chunksCtr.incrementAndGet();
}
- catch (IgniteException e) {
- Throwable[] chain = X.getThrowables(e);
-
- Throwable cause = chain[chain.length - 1];
-
- if (!e.getMessage().startsWith("Failed to open file (file is opened for writing)")
- && (cause == null
- || !cause.getMessage().startsWith("Failed to open file (file is opened for writing)")))
- err.compareAndSet(null, e);
+ catch (IgniteException ignore) {
+ // No-op.
}
catch (IOException e) {
err.compareAndSet(null, e);
[11/12] ignite git commit: Merge branch 'ignite-2834' into ignite-2813
Posted by vo...@apache.org.
Merge branch 'ignite-2834' into ignite-2813
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
# modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
# modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7a41eda2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7a41eda2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7a41eda2
Branch: refs/heads/ignite-2813
Commit: 7a41eda213fcd93e470efe0dbbd7a0d974b3ff0f
Parents: faa897f 1b10643
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 17:01:48 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 17:01:48 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 47 +
.../java/org/apache/ignite/igfs/IgfsPath.java | 9 +
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../org/apache/ignite/internal/IgnitionEx.java | 10 +-
.../managers/communication/GridIoManager.java | 10 +
.../GridCachePartitionExchangeManager.java | 3 +
.../processors/cache/GridCacheUtils.java | 14 +-
.../IgfsColocatedMetadataAffinityKeyMapper.java | 47 +
.../internal/processors/igfs/IgfsImpl.java | 59 +-
.../processors/igfs/IgfsMetaManager.java | 1014 +++++++++---------
.../internal/processors/igfs/IgfsPathIds.java | 291 +++++
.../processors/igfs/IgfsPathsCreateResult.java | 77 ++
.../internal/processors/igfs/IgfsUtils.java | 79 +-
.../util/nio/GridNioRecoveryDescriptor.java | 21 +-
.../ignite/internal/util/nio/GridNioServer.java | 52 +-
.../communication/tcp/TcpCommunicationSpi.java | 8 +
.../tcp/TcpCommunicationSpiMBean.java | 8 +-
.../ComputeJobCancelWithServiceSelfTest.java | 154 +++
.../processors/igfs/IgfsAbstractSelfTest.java | 39 +-
.../igfs/IgfsMetaManagerSelfTest.java | 31 +-
.../processors/igfs/IgfsProcessorSelfTest.java | 26 +-
.../processors/igfs/IgfsStartCacheTest.java | 7 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 +-
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
.../apache/ignite/igfs/IgfsEventsTestSuite.java | 10 +-
25 files changed, 1415 insertions(+), 608 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 7fea118,9ec583c..398428a
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@@ -1033,8 -1034,15 +1034,15 @@@ public final class IgfsImpl implements
else
dirProps = fileProps = new HashMap<>(props);
- IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.create(path, false/*append*/, overwrite, dirProps,
- cfg.getBlockSize(), affKey, evictExclude(path, true), fileProps);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.create(
++ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.create(
+ path,
+ dirProps,
+ overwrite,
+ cfg.getBlockSize(),
+ affKey,
+ evictExclude(path, true),
+ fileProps
+ );
assert t2 != null;
@@@ -1104,8 -1112,15 +1112,15 @@@
else
dirProps = fileProps = new HashMap<>(props);
- IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.create(path, true/*append*/, false/*overwrite*/,
- dirProps, cfg.getBlockSize(), null/*affKey*/, evictExclude(path, true), fileProps);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = meta.append(
++ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = meta.append(
+ path,
+ dirProps,
+ create,
+ cfg.getBlockSize(),
+ null/*affKey*/,
+ evictExclude(path, true),
+ fileProps
+ );
assert t2 != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 67ca475,d91b0bc..d7d6048
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@@ -845,82 -901,46 +903,46 @@@ public class IgfsMetaManager extends Ig
IgniteInternalTx tx = startTx();
try {
- // 3. Obtain the locks.
- final Map<IgniteUuid, IgfsEntryInfo> allInfos = lockIds(allIds);
+ // Obtain the locks.
- final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
++ final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
- // 4. Verify integrity of source directory.
- if (!verifyPathIntegrity(srcPath, srcPathIds, allInfos)) {
+ // Verify integrity of source and destination paths.
+ if (!srcPathIds.verifyIntegrity(lockInfos))
throw new IgfsPathNotFoundException("Failed to perform move because source directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
- }
-
- // 5. Verify integrity of destination directory.
- final IgfsPath dstDirPath = dstLeafId != null ? dstPath : dstPath.parent();
- if (!verifyPathIntegrity(dstDirPath, dstPathIds, allInfos)) {
+ if (!dstPathIds.verifyIntegrity(lockInfos))
throw new IgfsPathNotFoundException("Failed to perform move because destination directory " +
"structure changed concurrently [src=" + srcPath + ", dst=" + dstPath + ']');
- }
-
- // 6. Calculate source and destination targets which will be changed.
- IgniteUuid srcTargetId = srcPathIds.get(srcPathIds.size() - 2);
- IgfsEntryInfo srcTargetInfo = allInfos.get(srcTargetId);
- String srcName = srcPath.name();
-
- IgniteUuid dstTargetId;
- IgfsEntryInfo dstTargetInfo;
- String dstName;
-
- if (dstLeafId != null) {
- // Destination leaf exists. Check if it is an empty directory.
- IgfsEntryInfo dstLeafInfo = allInfos.get(dstLeafId);
- assert dstLeafInfo != null;
+ // Addiional check: is destination directory?
- IgfsFileInfo dstParentInfo = lockInfos.get(dstPathIds.lastId());
++ IgfsEntryInfo dstParentInfo = lockInfos.get(dstPathIds.lastId());
- if (dstLeafInfo.isDirectory()) {
- // Destination is a directory.
- dstTargetId = dstLeafId;
- dstTargetInfo = dstLeafInfo;
- dstName = srcPath.name();
- }
- else {
- // Error, destination is existing file.
- throw new IgfsPathAlreadyExistsException("Failed to perform move " +
- "because destination points to " +
- "existing file [src=" + srcPath + ", dst=" + dstPath + ']');
- }
- }
- else {
- // Destination leaf doesn't exist, so we operate on parent.
- dstTargetId = dstPathIds.get(dstPathIds.size() - 1);
- dstTargetInfo = allInfos.get(dstTargetId);
- dstName = dstPath.name();
- }
-
- assert dstTargetInfo != null;
- assert dstTargetInfo.isDirectory();
+ if (dstParentInfo.isFile())
+ throw new IgfsPathAlreadyExistsException("Failed to perform move because destination points " +
+ "to existing file [src=" + srcPath + ", dst=" + dstPath + ']');
- // 7. Last check: does destination target already have listing entry with the same name?
- if (dstTargetInfo.hasChild(dstName)) {
+ // Additional check: does destination already has child with the same name?
+ if (dstParentInfo.hasChild(dstName))
throw new IgfsPathAlreadyExistsException("Failed to perform move because destination already " +
"contains entry with the same name existing file [src=" + srcPath +
", dst=" + dstPath + ']');
- }
- // 8. Actual move: remove from source parent and add to destination target.
- IgfsListingEntry entry = srcTargetInfo.listing().get(srcName);
+ // Actual move: remove from source parent and add to destination target.
+ IgfsFileInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId());
- transferEntry(entry, srcTargetId, srcName, dstTargetId, dstName);
+ IgfsFileInfo srcInfo = lockInfos.get(srcPathIds.lastId());
+ String srcName = srcPathIds.lastPart();
+ IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcName);
- tx.commit();
+ transferEntry(srcEntry, srcParentInfo.id(), srcName, dstParentInfo.id(), dstName);
- IgfsPath realNewPath = new IgfsPath(dstDirPath, dstName);
+ tx.commit();
- IgfsEntryInfo moved = allInfos.get(srcPathIds.get(srcPathIds.size() - 1));
+ IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
// Set the new path to the info to simplify event creation:
- return moved.path(realNewPath);
- return IgfsFileInfo.builder(srcInfo).path(newPath).build();
++ return IgfsEntryInfo.builder(srcInfo).path(newPath).build();
}
finally {
tx.close();
@@@ -1153,37 -1172,19 +1178,19 @@@
throw new IgfsDirectoryNotEmptyException("Failed to remove directory (directory is not " +
"empty and recursive flag is not set).");
- IgfsEntryInfo destInfo = infoMap.get(trashId);
-
- assert destInfo != null;
-
- final String srcFileName = path.name();
-
- final String destFileName = victimId.toString();
-
- assert !destInfo.hasChild(destFileName) : "Failed to add file name into the " +
- "destination directory (file already exists) [destName=" + destFileName + ']';
-
- IgfsEntryInfo srcParentInfo = infoMap.get(pathIdList.get(pathIdList.size() - 2));
+ // Prepare trash data.
+ IgfsFileInfo trashInfo = lockInfos.get(trashId);
+ final String trashName = victimId.toString();
- assert srcParentInfo != null;
+ assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
+ "destination directory (file already exists) [destName=" + trashName + ']';
- IgniteUuid srcParentId = srcParentInfo.id();
- assert srcParentId.equals(pathIdList.get(pathIdList.size() - 2));
+ IgniteUuid parentId = pathIds.lastParentId();
- IgfsFileInfo parentInfo = lockInfos.get(parentId);
++ IgfsEntryInfo parentInfo = lockInfos.get(parentId);
- IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcFileName);
-
- assert srcEntry != null : "Deletion victim not found in parent listing [path=" + path +
- ", name=" + srcFileName + ", listing=" + srcParentInfo.listing() + ']';
-
- assert victimId.equals(srcEntry.fileId());
-
- transferEntry(srcEntry, srcParentId, srcFileName, trashId, destFileName);
+ transferEntry(parentInfo.listing().get(victimName), parentId, victimName, trashId, trashName);
if (victimInfo.isFile())
- // Update a file info of the removed file with a file path,
- // which will be used by delete worker for event notifications.
invokeUpdatePath(victimId, path);
tx.commit();
@@@ -3219,19 -3179,10 +3212,10 @@@
this.entry = entry;
}
- /**
- * Empty constructor required for {@link Externalizable}.
- *
- */
- public ListingAddProcessor() {
- // No-op.
- }
-
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args) {
- IgfsFileInfo fileInfo = e.getValue();
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args) {
+ IgfsEntryInfo fileInfo = e.getValue();
- assert fileInfo != null : "File info not found for the child: " + entry.fileId();
assert fileInfo.isDirectory();
Map<String, IgfsListingEntry> listing = new HashMap<>(fileInfo.listing());
@@@ -3508,219 -3438,286 +3473,286 @@@
}
}
- /** File chain builder. */
- private class DirectoryChainBuilder {
- /** The requested path to be created. */
- private final IgfsPath path;
+ /**
+ * Create a new file.
+ *
+ * @param path Path.
+ * @param bufSize Buffer size.
+ * @param overwrite Overwrite flag.
+ * @param affKey Affinity key.
+ * @param replication Replication factor.
+ * @param props Properties.
+ * @param simpleCreate Whether new file should be created in secondary FS using create(Path, boolean) method.
+ * @return Tuple containing the created file info and its parent id.
+ */
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> create(
++ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> create(
+ final IgfsPath path,
+ Map<String, String> dirProps,
+ final boolean overwrite,
+ final int blockSize,
+ final @Nullable IgniteUuid affKey,
+ final boolean evictExclude,
+ @Nullable Map<String, String> fileProps) throws IgniteCheckedException {
+ validTxState(false);
- /** Full path components. */
- private final List<String> components;
+ while (true) {
+ if (busyLock.enterBusy()) {
+ try {
+ // Prepare path IDs.
+ IgfsPathIds pathIds = pathIds(path);
- /** The list of ids. */
- private final List<IgniteUuid> idList;
+ // Prepare lock IDs.
+ Set<IgniteUuid> lockIds = new TreeSet<>(PATH_ID_SORTING_COMPARATOR);
- /** The set of ids. */
- private final SortedSet<IgniteUuid> idSet = new TreeSet<IgniteUuid>(PATH_ID_SORTING_COMPARATOR);
+ pathIds.addExistingIds(lockIds);
+ pathIds.addSurrogateIds(lockIds);
- /** The middle node properties. */
- private final Map<String, String> props;
+ // In overwrite mode we also lock ID of potential replacement as well as trash ID.
+ IgniteUuid overwriteId = IgniteUuid.randomUuid();
+ IgniteUuid trashId = IgfsUtils.randomTrashId();
- /** The leaf node properties. */
- private final Map<String, String> leafProps;
+ if (overwrite) {
+ lockIds.add(overwriteId);
- /** The lowermost exsiting path id. */
- private final IgniteUuid lowermostExistingId;
+ // Trash ID is only added if we suspect conflict.
+ if (pathIds.allExists())
+ lockIds.add(trashId);
+ }
- /** The existing path. */
- private final IgfsPath existingPath;
+ // Start TX.
+ IgniteInternalTx tx = startTx();
- /** The created leaf info. */
- private IgfsEntryInfo leafInfo;
+ try {
+ Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
- /** The leaf parent id. */
- private IgniteUuid leafParentId;
+ if (!pathIds.verifyIntegrity(lockInfos))
+ // Directory structure changed concurrently. So we simply re-try.
+ continue;
- /** The number of existing ids. */
- private final int existingIdCnt;
+ if (pathIds.allExists()) {
+ // All participants found.
+ IgfsFileInfo oldInfo = lockInfos.get(pathIds.lastId());
- /** Whether laef is directory. */
- private final boolean leafDir;
+ // Check: is it a file?
+ if (!oldInfo.isFile())
+ throw new IgfsPathIsDirectoryException("Failed to create a file: " + path);
- /** Block size. */
- private final int blockSize;
+ // Check: can we overwrite it?
+ if (!overwrite)
+ throw new IgfsPathAlreadyExistsException("Failed to create a file: " + path);
- /** Affinity key. */
- private final IgniteUuid affKey;
+ // Check if file already opened for write.
+ if (oldInfo.lockId() != null)
+ throw new IgfsException("File is already opened for write: " + path);
- /** Evict exclude flag. */
- private final boolean evictExclude;
+ // At this point file can be re-created safely.
- /**
- * Constructor for directories.
- *
- * @param path Path.
- * @param props Properties.
- * @throws IgniteCheckedException If failed.
- */
- protected DirectoryChainBuilder(IgfsPath path, Map<String, String> props) throws IgniteCheckedException {
- this(path, props, props, true, 0, null, false);
- }
+ // First step: add existing to trash listing.
+ IgniteUuid oldId = pathIds.lastId();
- /**
- * Constructor for files.
- *
- * @param path Path.
- * @param dirProps Directory properties.
- * @param fileProps File properties.
- * @param blockSize Block size.
- * @param affKey Affinity key (optional).
- * @param evictExclude Evict exclude flag.
- * @throws IgniteCheckedException If failed.
- */
- protected DirectoryChainBuilder(IgfsPath path, Map<String, String> dirProps, Map<String, String> fileProps,
- int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude)
- throws IgniteCheckedException {
- this(path, dirProps, fileProps, false, blockSize, affKey, evictExclude);
- }
+ id2InfoPrj.invoke(trashId, new ListingAddProcessor(oldId.toString(),
+ new IgfsListingEntry(oldId, true)));
- /**
- * Constructor.
- *
- * @param path Path.
- * @param props Middle properties.
- * @param leafProps Leaf properties.
- * @param leafDir Whether leaf is directory or file.
- * @param blockSize Block size.
- * @param affKey Affinity key (optional).
- * @param evictExclude Evict exclude flag.
- * @throws IgniteCheckedException If failed.
- */
- private DirectoryChainBuilder(IgfsPath path, Map<String,String> props, Map<String,String> leafProps,
- boolean leafDir, int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude)
- throws IgniteCheckedException {
- this.path = path;
- this.components = path.components();
- this.idList = fileIds(path);
- this.props = props;
- this.leafProps = leafProps;
- this.leafDir = leafDir;
- this.blockSize = blockSize;
- this.affKey = affKey;
- this.evictExclude = evictExclude;
+ // Second step: replace ID in parent directory.
+ String name = pathIds.lastPart();
+ IgniteUuid parentId = pathIds.lastParentId();
+
+ id2InfoPrj.invoke(parentId, new ListingReplaceProcessor(name, overwriteId));
+
+ // Third step: create the file.
+ long createTime = System.currentTimeMillis();
- // Store all the non-null ids in the set & construct existing path in one loop:
- IgfsPath existingPath = path.root();
+ IgfsFileInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
+ fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
- assert idList.size() == components.size() + 1;
+ // Fourth step: update path of remove file.
+ invokeUpdatePath(oldId, path);
- // Find the lowermost existing id:
- IgniteUuid lowermostExistingId = null;
+ // Prepare result and commit.
+ IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId);
- int idIdx = 0;
+ tx.commit();
- for (IgniteUuid id : idList) {
- if (id == null)
- break;
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EventType.EVT_IGFS_FILE_OPENED_WRITE);
- lowermostExistingId = id;
+ return t2;
+ }
+ else {
+ // Create file and parent folders.
+ IgfsPathsCreateResult res =
+ createFile(pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
- boolean added = idSet.add(id);
+ if (res == null)
+ continue;
- assert added : "Not added id = " + id;
+ // Commit.
+ tx.commit();
- if (idIdx >= 1) // skip root.
- existingPath = new IgfsPath(existingPath, components.get(idIdx - 1));
+ // Generate events.
+ generateCreateEvents(res.createdPaths(), true);
- idIdx++;
+ return new T2<>(res.info(), res.parentId());
+ }
+ }
+ finally {
+ tx.close();
+ }
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
}
+ else
+ throw new IllegalStateException("Failed to mkdir because Grid is stopping. [path=" + path + ']');
+ }
+ }
- assert idSet.contains(IgfsUtils.ROOT_ID);
+ /**
+ * Create directory and it's parents.
+ *
+ * @param pathIds Path IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @return Result or {@code} if the first parent already contained child with the same name.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ Map<String, String> dirProps) throws IgniteCheckedException {
+ // Check if entry we are going to write to is directory.
+ if (lockInfos.get(pathIds.lastExistingId()).isFile())
+ throw new IgfsParentNotDirectoryException("Failed to create directory (parent " +
+ "element is not a directory)");
- this.lowermostExistingId = lowermostExistingId;
+ return createFileOrDirectory(true, pathIds, lockInfos, dirProps, null, 0, null, false);
+ }
- this.existingPath = existingPath;
+ /**
+ * Create file and all it's parents.
+ *
+ * @param pathIds Paths IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @param fileProps File propertris.
+ * @param blockSize Block size.
+ * @param affKey Affinity key (optional)
+ * @param evictExclude Evict exclude flag.
+ * @return Result or {@code} if the first parent already contained child with the same name.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
+ boolean evictExclude) throws IgniteCheckedException{
+ // Check if entry we are going to write to is directory.
+ if (lockInfos.get(pathIds.lastExistingId()).isFile())
+ throw new IgfsParentNotDirectoryException("Failed to open file for write " +
+ "(parent element is not a directory): " + pathIds.path());
- this.existingIdCnt = idSet.size();
- }
+ return createFileOrDirectory(false, pathIds, lockInfos, dirProps, fileProps, blockSize, affKey, evictExclude);
+ }
- /**
- * Does the main portion of job building the renmaining path.
- */
- public final void doBuild() throws IgniteCheckedException {
- // Fix current time. It will be used in all created entities.
- long createTime = System.currentTimeMillis();
+ /**
+ * Ceate file or directory.
+ *
+ * @param dir Directory flag.
+ * @param pathIds Path IDs.
+ * @param lockInfos Lock infos.
+ * @param dirProps Directory properties.
+ * @param fileProps File properties.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param evictExclude Evict exclude flag.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
+ Map<IgniteUuid, IgfsFileInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
+ int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException {
+ // This is our starting point.
+ int lastExistingIdx = pathIds.lastExistingIndex();
+ IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
- IgfsListingEntry childInfo = null;
- String childName = null;
+ // If current info already contains entry with the same name as it's child, then something
+ // has changed concurrently. We must re-try because we cannot get info of this unexpected
+ // element due to possible deadlocks.
+ int curIdx = lastExistingIdx + 1;
- IgniteUuid parentId = null;
+ String curPart = pathIds.part(curIdx);
+ IgniteUuid curId = pathIds.surrogateId(curIdx);
+ IgniteUuid curParentId = lastExistingInfo.id();
- // This loop creates the missing directory chain from the bottom to the top:
- for (int i = components.size() - 1; i >= existingIdCnt - 1; i--) {
- IgniteUuid childId = IgniteUuid.randomUuid();
- boolean childDir;
+ if (lastExistingInfo.hasChild(curPart))
+ return null;
- if (childName == null) {
- assert childInfo == null;
+ // First step: add new entry to the last existing element.
+ id2InfoPrj.invoke(lastExistingInfo.id(), new ListingAddProcessor(curPart,
+ new IgfsListingEntry(curId, dir || !pathIds.isLastIndex(curIdx))));
- if (leafDir) {
- childDir = true;
+ // Events support.
+ IgfsPath lastCreatedPath = pathIds.lastExistingPath();
- leafInfo = invokeAndGet(childId, new DirectoryCreateProcessor(createTime, leafProps));
- }
- else {
- childDir = false;
+ List<IgfsPath> createdPaths = new ArrayList<>(pathIds.count() - curIdx);
- leafInfo = invokeAndGet(childId, new FileCreateProcessor(createTime, leafProps, blockSize,
- affKey, createFileLockId(false), evictExclude));
- }
- }
- else {
- assert childInfo != null;
+ // Second step: create middle directories.
+ long createTime = System.currentTimeMillis();
- childDir = true;
+ while (curIdx < pathIds.count() - 1) {
+ int nextIdx = curIdx + 1;
- id2InfoPrj.invoke(childId, new DirectoryCreateProcessor(createTime, props, childName, childInfo));
+ String nextPart = pathIds.part(nextIdx);
+ IgniteUuid nextId = pathIds.surrogateId(nextIdx);
- if (parentId == null)
- parentId = childId;
- }
+ id2InfoPrj.invoke(curId, new DirectoryCreateProcessor(createTime, dirProps,
+ nextPart, new IgfsListingEntry(nextId, dir || !pathIds.isLastIndex(nextIdx))));
- childInfo = new IgfsListingEntry(childId, childDir);
+ // Save event.
+ lastCreatedPath = new IgfsPath(lastCreatedPath, curPart);
- childName = components.get(i);
- }
+ createdPaths.add(lastCreatedPath);
- if (parentId == null)
- parentId = lowermostExistingId;
+ // Advance things further.
+ curIdx++;
- leafParentId = parentId;
+ curParentId = curId;
- // Now link the newly created directory chain to the lowermost existing parent:
- id2InfoPrj.invoke(lowermostExistingId, new ListingAddProcessor(childName, childInfo));
+ curPart = nextPart;
+ curId = nextId;
}
- /**
- * Sends events.
- */
- public final void sendEvents() {
- if (evts.isRecordable(EVT_IGFS_DIR_CREATED)) {
- IgfsPath createdPath = existingPath;
+ // Third step: create leaf.
+ IgfsFileInfo info;
- for (int i = existingPath.components().size(); i < components.size() - 1; i++) {
- createdPath = new IgfsPath(createdPath, components.get(i));
+ if (dir)
+ info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps));
+ else
+ info = invokeAndGet(curId, new FileCreateProcessor(createTime, fileProps,
+ blockSize, affKey, createFileLockId(false), evictExclude));
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPath, EVT_IGFS_DIR_CREATED);
- }
- }
+ createdPaths.add(pathIds.path());
- if (leafDir)
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_DIR_CREATED);
- else {
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_CREATED);
- IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
- }
+ return new IgfsPathsCreateResult(createdPaths, info, curParentId);
+ }
+
+ /**
+ * Generate events for created file or directory.
+ *
+ * @param createdPaths Created paths.
+ * @param file Whether file was created.
+ */
+ private void generateCreateEvents(List<IgfsPath> createdPaths, boolean file) {
+ if (evts.isRecordable(EventType.EVT_IGFS_DIR_CREATED)) {
+ for (int i = 0; i < createdPaths.size() - 1; i++)
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), createdPaths.get(i),
+ EventType.EVT_IGFS_DIR_CREATED);
}
+
+ IgfsPath leafPath = createdPaths.get(createdPaths.size() - 1);
+
+ if (file) {
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_CREATED);
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_FILE_OPENED_WRITE);
+ }
+ else
+ IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index bd3c0b8,ef7d5c7..6844d42
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@@ -256,69 -269,56 +269,123 @@@ public class IgfsUtils
}
/**
+ * @param cfg Grid configuration.
+ * @param cacheName Cache name.
+ * @return {@code True} in this is IGFS data or meta cache.
+ */
+ public static boolean isIgfsCache(IgniteConfiguration cfg, @Nullable String cacheName) {
+ FileSystemConfiguration[] igfsCfgs = cfg.getFileSystemConfiguration();
+
+ if (igfsCfgs != null) {
+ for (FileSystemConfiguration igfsCfg : igfsCfgs) {
+ // IGFS config probably has not been validated yet => possible NPE, so we check for null.
+ if (igfsCfg != null) {
+ if (F.eq(cacheName, igfsCfg.getDataCacheName()) || F.eq(cacheName, igfsCfg.getMetaCacheName()))
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Prepare cache configuration if this is IGFS meta or data cache.
+ *
+ * @param cfg Configuration.
+ * @param ccfg Cache configuration.
+ */
+ public static void prepareCacheConfiguration(IgniteConfiguration cfg, CacheConfiguration ccfg) {
+ FileSystemConfiguration[] igfsCfgs = cfg.getFileSystemConfiguration();
+
+ if (igfsCfgs != null) {
+ for (FileSystemConfiguration igfsCfg : igfsCfgs) {
+ if (igfsCfg != null) {
+ if (F.eq(ccfg.getName(), igfsCfg.getMetaCacheName())) {
+ ccfg.setCopyOnRead(false);
+
+ // Set co-located affinity mapper if needed.
+ if (igfsCfg.isColocateMetadata() && ccfg.getCacheMode() == CacheMode.REPLICATED &&
+ ccfg.getAffinityMapper() == null)
+ ccfg.setAffinityMapper(new IgfsColocatedMetadataAffinityKeyMapper());
+
+ return;
+ }
+
+ if (F.eq(ccfg.getName(), igfsCfg.getDataCacheName())) {
+ ccfg.setCopyOnRead(false);
+
+ return;
+ }
+ }
+ }
+ }
+ }
++
++ /**
+ * Create empty directory with the given ID.
+ *
+ * @param id ID.
+ * @return File info.
+ */
+ public static IgfsDirectoryInfo createDirectory(IgniteUuid id) {
+ return createDirectory(id, null, null);
+ }
+
+ /**
+ * Create directory.
+ *
+ * @param id ID.
+ * @param listing Listing.
+ * @param props Properties.
+ * @return File info.
+ */
+ public static IgfsDirectoryInfo createDirectory(
+ IgniteUuid id,
+ @Nullable Map<String, IgfsListingEntry> listing,
+ @Nullable Map<String, String> props) {
+ long time = System.currentTimeMillis();
+
+ return createDirectory(id, listing, props, time, time);
+ }
+
+ /**
+ * Create directory.
+ *
+ * @param id ID.
+ * @param listing Listing.
+ * @param props Properties.
+ * @param createTime Create time.
+ * @param modificationTime Modification time.
+ * @return File info.
+ */
+ public static IgfsDirectoryInfo createDirectory(
+ IgniteUuid id,
+ @Nullable Map<String, IgfsListingEntry> listing,
+ @Nullable Map<String,String> props,
+ long createTime,
+ long modificationTime) {
+ return new IgfsDirectoryInfo(id, listing, props, createTime, modificationTime);
+ }
+
+ /**
+ * Create file.
+ *
+ * @param id File ID.
+ * @param blockSize Block size.
+ * @param len Length.
+ * @param affKey Affinity key.
+ * @param lockId Lock ID.
+ * @param evictExclude Evict exclude flag.
+ * @param props Properties.
+ * @param accessTime Access time.
+ * @param modificationTime Modification time.
+ * @return File info.
+ */
+ public static IgfsFileInfo createFile(IgniteUuid id, int blockSize, long len, @Nullable IgniteUuid affKey,
+ @Nullable IgniteUuid lockId, boolean evictExclude, @Nullable Map<String, String> props, long accessTime,
+ long modificationTime) {
+ return new IgfsFileInfo(id, blockSize, len, affKey, props, null, lockId, accessTime, modificationTime,
+ evictExclude);
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
index a42471d,26424f0..3dc2791
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManagerSelfTest.java
@@@ -209,10 -211,10 +211,10 @@@ public class IgfsMetaManagerSelfTest ex
return info;
}
- private IgfsFileInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
+ private IgfsEntryInfo createFileAndGetInfo(String path) throws IgniteCheckedException {
IgfsPath p = path(path);
- IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = mgr.create(p, false, false, null, 400, null, false, null);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = mgr.create(p, null, false, 400, null, false, null);
++ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = mgr.create(p, null, false, 400, null, false, null);
assert t2 != null;
assert !t2.get1().isDirectory();
http://git-wip-us.apache.org/repos/asf/ignite/blob/7a41eda2/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorSelfTest.java
----------------------------------------------------------------------
[09/12] ignite git commit: Merge branch 'master' into ignite-2834
Posted by vo...@apache.org.
Merge branch 'master' into ignite-2834
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ae6675d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ae6675d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ae6675d7
Branch: refs/heads/ignite-2813
Commit: ae6675d73186b61e600cd7d1780abbc350b6cda9
Parents: cd0b132 88ffed1
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 16:40:34 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 16:40:34 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/igfs/IgfsPath.java | 9 +
.../managers/communication/GridIoManager.java | 10 +
.../GridCachePartitionExchangeManager.java | 3 +
.../internal/processors/igfs/IgfsImpl.java | 59 +-
.../processors/igfs/IgfsMetaManager.java | 1058 +++++++++---------
.../internal/processors/igfs/IgfsPathIds.java | 291 +++++
.../processors/igfs/IgfsPathsCreateResult.java | 77 ++
.../internal/processors/igfs/IgfsUtils.java | 23 +-
.../util/nio/GridNioRecoveryDescriptor.java | 21 +-
.../ignite/internal/util/nio/GridNioServer.java | 52 +-
.../communication/tcp/TcpCommunicationSpi.java | 8 +
.../tcp/TcpCommunicationSpiMBean.java | 8 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 68 +-
.../igfs/IgfsMetaManagerSelfTest.java | 31 +-
.../processors/igfs/IgfsProcessorSelfTest.java | 26 +-
.../processors/igfs/IgfsStartCacheTest.java | 9 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 3 +-
.../apache/ignite/igfs/IgfsEventsTestSuite.java | 10 +-
18 files changed, 1139 insertions(+), 627 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae6675d7/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
[10/12] ignite git commit: IGNITE-2834: Minors.
Posted by vo...@apache.org.
IGNITE-2834: Minors.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1b106438
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1b106438
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1b106438
Branch: refs/heads/ignite-2813
Commit: 1b1064384ebcfac34ad234edfa92bff1c5b899c1
Parents: ae6675d
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 16:42:38 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 16:42:38 2016 +0300
----------------------------------------------------------------------
.../src/main/java/org/apache/ignite/internal/IgnitionEx.java | 2 +-
.../ignite/internal/managers/discovery/GridDiscoveryManager.java | 3 +--
.../processors/cache/binary/CacheObjectBinaryProcessorImpl.java | 3 +--
.../org/apache/ignite/internal/processors/igfs/IgfsUtils.java | 1 +
.../ignite/internal/visor/node/VisorNodeDataCollectorJob.java | 4 ++--
5 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b106438/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index f560917..533b6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1951,7 +1951,7 @@ public class IgnitionEx {
cfg.setCacheConfiguration(cacheCfgs.toArray(new CacheConfiguration[cacheCfgs.size()]));
- // Iterate over IGFS caches and prepare their configuration if needed.
+ // Iterate over IGFS caches and prepare their configurations if needed.
assert cfg.getCacheConfiguration() != null;
for (CacheConfiguration ccfg : cfg.getCacheConfiguration())
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b106438/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 7936a04..7b795d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -75,7 +75,6 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetrics;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -1623,7 +1622,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
CachePredicate pred = entry.getValue();
- if (!CU.isSystemCache(cacheName) && !IgfsUtils.isIgfsCache(ctx.config(), cacheName) &&
+ if (!CU.isSystemCache(cacheName) && !CU.isIgfsCache(ctx.config(), cacheName) &&
pred != null && pred.cacheNode(node))
caches.put(cacheName, pred.cacheMode);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b106438/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 0a73fe0..18e2d09 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -82,7 +82,6 @@ import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.lang.GridMapEntry;
@@ -736,7 +735,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
assert cfg != null;
boolean binaryEnabled = marsh instanceof BinaryMarshaller && !GridCacheUtils.isSystemCache(cfg.getName()) &&
- !IgfsUtils.isIgfsCache(ctx.config(), cfg.getName());
+ !GridCacheUtils.isIgfsCache(ctx.config(), cfg.getName());
CacheObjectContext ctx0 = super.contextForCache(cfg);
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b106438/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 8b15943..ef7d5c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -291,6 +291,7 @@ public class IgfsUtils {
/**
* Prepare cache configuration if this is IGFS meta or data cache.
+ *
* @param cfg Configuration.
* @param ccfg Cache configuration.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/1b106438/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
index e47b013..f996d9a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
-import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.util.ipc.IpcServerEndpoint;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -36,6 +35,7 @@ import org.apache.ignite.internal.visor.igfs.VisorIgfs;
import org.apache.ignite.internal.visor.igfs.VisorIgfsEndpoint;
import org.apache.ignite.lang.IgniteProductVersion;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isIgfsCache;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isSystemCache;
import static org.apache.ignite.internal.visor.compute.VisorComputeMonitoringHolder.COMPUTE_MONITORING_HOLDER_KEY;
import static org.apache.ignite.internal.visor.util.VisorTaskUtils.VISOR_TASK_EVTS;
@@ -132,7 +132,7 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa
GridCacheProcessor cacheProc = ignite.context().cache();
for (String cacheName : cacheProc.cacheNames()) {
- if (arg.systemCaches() || !(isSystemCache(cacheName) || IgfsUtils.isIgfsCache(cfg, cacheName))) {
+ if (arg.systemCaches() || !(isSystemCache(cacheName) || isIgfsCache(cfg, cacheName))) {
long start0 = U.currentTimeMillis();
try {
[04/12] ignite git commit: IGNITE-2834: Implemented (2).
Posted by vo...@apache.org.
IGNITE-2834: Implemented (2).
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/cd0b1326
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/cd0b1326
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/cd0b1326
Branch: refs/heads/ignite-2813
Commit: cd0b132658e32ce4ff82e91c2c16a07a416d14f1
Parents: 83048d5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Mar 17 17:03:30 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Mar 17 17:03:30 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/cache/GridCacheUtils.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd0b1326/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 617d9b8..1cdd303 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx;
+import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
import org.apache.ignite.internal.util.lang.IgniteInClosureX;
import org.apache.ignite.internal.util.typedef.C1;
@@ -1377,6 +1378,15 @@ public class GridCacheUtils {
// }
/**
+ * @param cfg Grid configuration.
+ * @param cacheName Cache name.
+ * @return {@code True} in this is IGFS data or meta cache.
+ */
+ public static boolean isIgfsCache(IgniteConfiguration cfg, @Nullable String cacheName) {
+ return IgfsUtils.isIgfsCache(cfg, cacheName);
+ }
+
+ /**
* Convert TTL to expire time.
*
* @param ttl TTL.
[02/12] ignite git commit: CacheEntryProcessorCopySelfTest should be
added to suite conditionally
Posted by vo...@apache.org.
CacheEntryProcessorCopySelfTest should be added to suite conditionally
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3e53f172
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3e53f172
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3e53f172
Branch: refs/heads/ignite-2813
Commit: 3e53f17224c2fccfcce50b347a0dcc6aca543413
Parents: d3420e6
Author: agura <ag...@gridgain.com>
Authored: Thu Mar 17 16:03:20 2016 +0300
Committer: agura <ag...@gridgain.com>
Committed: Thu Mar 17 16:14:01 2016 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3e53f172/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 2613175..9892ff7 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -180,7 +180,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(IgniteCacheAtomicLocalWithStoreInvokeTest.class);
suite.addTestSuite(IgniteCacheTxInvokeTest.class);
suite.addTestSuite(IgniteCacheEntryProcessorCallTest.class);
- suite.addTestSuite(CacheEntryProcessorCopySelfTest.class);
+ GridTestUtils.addTestIfNeeded(suite, CacheEntryProcessorCopySelfTest.class, ignoredTests);
suite.addTestSuite(IgniteCacheTxNearEnabledInvokeTest.class);
suite.addTestSuite(IgniteCacheTxLocalInvokeTest.class);
suite.addTestSuite(IgniteCrossCacheTxStoreSelfTest.class);
@@ -295,6 +295,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(CachePutEventListenerErrorSelfTest.class);
suite.addTestSuite(IgniteTxConfigCacheSelfTest.class);
+
return suite;
}
}
[12/12] ignite git commit: IGNITE-2813: Merge from master.
Posted by vo...@apache.org.
IGNITE-2813: Merge from master.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dc71d212
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dc71d212
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dc71d212
Branch: refs/heads/ignite-2813
Commit: dc71d212cb476590d4e656ae9ab08a388a0da97f
Parents: 7a41eda
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri Mar 18 17:07:04 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri Mar 18 17:07:04 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 50 ++++++++++----------
.../internal/processors/igfs/IgfsPathIds.java | 4 +-
.../processors/igfs/IgfsPathsCreateResult.java | 6 +--
.../internal/processors/igfs/IgfsUtils.java | 1 +
4 files changed, 30 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71d212/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index d7d6048..84e4dae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -86,8 +86,6 @@ import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
-import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.builder;
-
/**
* Cache based structure (meta data) manager.
*/
@@ -929,9 +927,9 @@ public class IgfsMetaManager extends IgfsManager {
", dst=" + dstPath + ']');
// Actual move: remove from source parent and add to destination target.
- IgfsFileInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId());
+ IgfsEntryInfo srcParentInfo = lockInfos.get(srcPathIds.lastParentId());
- IgfsFileInfo srcInfo = lockInfos.get(srcPathIds.lastId());
+ IgfsEntryInfo srcInfo = lockInfos.get(srcPathIds.lastId());
String srcName = srcPathIds.lastPart();
IgfsListingEntry srcEntry = srcParentInfo.listing().get(srcName);
@@ -942,7 +940,7 @@ public class IgfsMetaManager extends IgfsManager {
IgfsPath newPath = new IgfsPath(dstPathIds.path(), dstName);
// Set the new path to the info to simplify event creation:
- return IgfsEntryInfo.builder(srcInfo).path(newPath).build();
+ return srcInfo.path(newPath);
}
finally {
tx.close();
@@ -1165,13 +1163,13 @@ public class IgfsMetaManager extends IgfsManager {
try {
// Lock participants.
- Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(allIds);
+ Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(allIds);
// Ensure that all participants are still in place.
if (!pathIds.verifyIntegrity(lockInfos))
return null;
- IgfsFileInfo victimInfo = lockInfos.get(victimId);
+ IgfsEntryInfo victimInfo = lockInfos.get(victimId);
// Cannot delete non-empty directory if recursive flag is not set.
if (!recursive && victimInfo.hasChildren())
@@ -1179,7 +1177,7 @@ public class IgfsMetaManager extends IgfsManager {
"empty and recursive flag is not set).");
// Prepare trash data.
- IgfsFileInfo trashInfo = lockInfos.get(trashId);
+ IgfsEntryInfo trashInfo = lockInfos.get(trashId);
final String trashName = victimId.toString();
assert !trashInfo.hasChild(trashName) : "Failed to add file name into the " +
@@ -1684,7 +1682,7 @@ public class IgfsMetaManager extends IgfsManager {
IgniteInternalTx tx = startTx();
try {
- final Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
+ final Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
// Directory structure changed concurrently. So we simply re-try.
@@ -3254,7 +3252,7 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Listing replace processor.
*/
- private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsFileInfo, Void>,
+ private static final class ListingReplaceProcessor implements EntryProcessor<IgniteUuid, IgfsEntryInfo, Void>,
Externalizable {
/** */
private static final long serialVersionUID = 0L;
@@ -3284,9 +3282,9 @@ public class IgfsMetaManager extends IgfsManager {
}
/** {@inheritDoc} */
- @Override public Void process(MutableEntry<IgniteUuid, IgfsFileInfo> e, Object... args)
+ @Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> e, Object... args)
throws EntryProcessorException {
- IgfsFileInfo fileInfo = e.getValue();
+ IgfsEntryInfo fileInfo = e.getValue();
assert fileInfo.isDirectory();
@@ -3300,7 +3298,7 @@ public class IgfsMetaManager extends IgfsManager {
listing.put(name, new IgfsListingEntry(id, oldEntry.isDirectory()));
- e.setValue(new IgfsFileInfo(listing, fileInfo));
+ e.setValue(fileInfo.listing(listing));
return null;
}
@@ -3384,7 +3382,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Tuple containing the file info and its parent id.
* @throws IgniteCheckedException If failed.
*/
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> append(
+ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> append(
final IgfsPath path,
Map<String, String> dirProps,
final boolean create,
@@ -3414,7 +3412,7 @@ public class IgfsMetaManager extends IgfsManager {
IgniteInternalTx tx = startTx();
try {
- Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
+ Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
// Directory structure changed concurrently. So we simply re-try.
@@ -3422,7 +3420,7 @@ public class IgfsMetaManager extends IgfsManager {
if (pathIds.allExists()) {
// All participants are found. Simply open the stream.
- IgfsFileInfo info = lockInfos.get(pathIds.lastId());
+ IgfsEntryInfo info = lockInfos.get(pathIds.lastId());
// Check: is it a file?
if (!info.isFile())
@@ -3435,7 +3433,7 @@ public class IgfsMetaManager extends IgfsManager {
// At this point we can open the stream safely.
info = invokeLock(info.id(), false);
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(info, pathIds.lastParentId());
+ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = new T2<>(info, pathIds.lastParentId());
tx.commit();
@@ -3523,7 +3521,7 @@ public class IgfsMetaManager extends IgfsManager {
IgniteInternalTx tx = startTx();
try {
- Map<IgniteUuid, IgfsFileInfo> lockInfos = lockIds(lockIds);
+ Map<IgniteUuid, IgfsEntryInfo> lockInfos = lockIds(lockIds);
if (!pathIds.verifyIntegrity(lockInfos))
// Directory structure changed concurrently. So we simply re-try.
@@ -3531,7 +3529,7 @@ public class IgfsMetaManager extends IgfsManager {
if (pathIds.allExists()) {
// All participants found.
- IgfsFileInfo oldInfo = lockInfos.get(pathIds.lastId());
+ IgfsEntryInfo oldInfo = lockInfos.get(pathIds.lastId());
// Check: is it a file?
if (!oldInfo.isFile())
@@ -3562,14 +3560,14 @@ public class IgfsMetaManager extends IgfsManager {
// Third step: create the file.
long createTime = System.currentTimeMillis();
- IgfsFileInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
+ IgfsEntryInfo newInfo = invokeAndGet(overwriteId, new FileCreateProcessor(createTime,
fileProps, blockSize, affKey, createFileLockId(false), evictExclude));
// Fourth step: update path of remove file.
invokeUpdatePath(oldId, path);
// Prepare result and commit.
- IgniteBiTuple<IgfsFileInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId);
+ IgniteBiTuple<IgfsEntryInfo, IgniteUuid> t2 = new T2<>(newInfo, parentId);
tx.commit();
@@ -3616,7 +3614,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
- @Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ @Nullable IgfsPathsCreateResult createDirectory(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps) throws IgniteCheckedException {
// Check if entry we are going to write to is directory.
if (lockInfos.get(pathIds.lastExistingId()).isFile())
@@ -3639,7 +3637,7 @@ public class IgfsMetaManager extends IgfsManager {
* @return Result or {@code} if the first parent already contained child with the same name.
* @throws IgniteCheckedException If failed.
*/
- @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsFileInfo> lockInfos,
+ @Nullable private IgfsPathsCreateResult createFile(IgfsPathIds pathIds, Map<IgniteUuid, IgfsEntryInfo> lockInfos,
Map<String, String> dirProps, Map<String, String> fileProps, int blockSize, @Nullable IgniteUuid affKey,
boolean evictExclude) throws IgniteCheckedException{
// Check if entry we are going to write to is directory.
@@ -3665,11 +3663,11 @@ public class IgfsMetaManager extends IgfsManager {
* @throws IgniteCheckedException If failed.
*/
private IgfsPathsCreateResult createFileOrDirectory(boolean dir, IgfsPathIds pathIds,
- Map<IgniteUuid, IgfsFileInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
+ Map<IgniteUuid, IgfsEntryInfo> lockInfos, Map<String, String> dirProps, Map<String, String> fileProps,
int blockSize, @Nullable IgniteUuid affKey, boolean evictExclude) throws IgniteCheckedException {
// This is our starting point.
int lastExistingIdx = pathIds.lastExistingIndex();
- IgfsFileInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
+ IgfsEntryInfo lastExistingInfo = lockInfos.get(pathIds.lastExistingId());
// If current info already contains entry with the same name as it's child, then something
// has changed concurrently. We must re-try because we cannot get info of this unexpected
@@ -3719,7 +3717,7 @@ public class IgfsMetaManager extends IgfsManager {
}
// Third step: create leaf.
- IgfsFileInfo info;
+ IgfsEntryInfo info;
if (dir)
info = invokeAndGet(curId, new DirectoryCreateProcessor(createTime, dirProps));
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71d212/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
index 1f669b0..2903239 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathIds.java
@@ -267,10 +267,10 @@ public class IgfsPathIds {
* @param infos Info.
* @return {@code True} if full integrity is preserved.
*/
- public boolean verifyIntegrity(Map<IgniteUuid, IgfsFileInfo> infos) {
+ public boolean verifyIntegrity(Map<IgniteUuid, IgfsEntryInfo> infos) {
for (int i = 0; i <= lastExistingIdx; i++) {
IgniteUuid curId = ids[i];
- IgfsFileInfo curInfo = infos.get(curId);
+ IgfsEntryInfo curInfo = infos.get(curId);
// Check if required ID is there.
if (curInfo == null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71d212/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
index 3b620f8..9462aa4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsPathsCreateResult.java
@@ -31,7 +31,7 @@ public class IgfsPathsCreateResult {
private final List<IgfsPath> paths;
/** Info of the last created file. */
- private final IgfsFileInfo info;
+ private final IgfsEntryInfo info;
/** Parent ID. */
private final IgniteUuid parentId;
@@ -43,7 +43,7 @@ public class IgfsPathsCreateResult {
* @param info Info of the last created file.
* @param parentId Parent ID.
*/
- public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsFileInfo info, IgniteUuid parentId) {
+ public IgfsPathsCreateResult(List<IgfsPath> paths, IgfsEntryInfo info, IgniteUuid parentId) {
this.paths = paths;
this.info = info;
this.parentId = parentId;
@@ -59,7 +59,7 @@ public class IgfsPathsCreateResult {
/**
* @return Info of the last created file.
*/
- public IgfsFileInfo info() {
+ public IgfsEntryInfo info() {
return info;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/dc71d212/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 6844d42..325f636 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -43,6 +43,7 @@ import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import java.lang.reflect.Constructor;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
[05/12] ignite git commit: Added ability to dump comm SPI stats to
log. (cherry picked from commit fa356e3)
Posted by vo...@apache.org.
Added ability to dump comm SPI stats to log.
(cherry picked from commit fa356e3)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0b10e0c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0b10e0c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0b10e0c8
Branch: refs/heads/ignite-2813
Commit: 0b10e0c8766dc41aa723881a75a6a37f3406f1aa
Parents: 3e53f17
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri Mar 18 16:09:35 2016 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri Mar 18 16:17:18 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 10 ++++
.../GridCachePartitionExchangeManager.java | 3 ++
.../util/nio/GridNioRecoveryDescriptor.java | 21 +++++++-
.../ignite/internal/util/nio/GridNioServer.java | 52 +++++++++++++++++++-
.../communication/tcp/TcpCommunicationSpi.java | 8 +++
.../tcp/TcpCommunicationSpiMBean.java | 8 ++-
6 files changed, 98 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 232ec2e..9ffbf4e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -1943,6 +1943,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return getSpi().getOutboundMessagesQueueSize();
}
+ /**
+ * Dumps SPI stats to logs in case TcpCommunicationSpi is used, no-op otherwise.
+ */
+ public void dumpStats() {
+ CommunicationSpi spi = getSpi();
+
+ if (spi instanceof TcpCommunicationSpi)
+ ((TcpCommunicationSpi)spi).dumpStats();
+ }
+
/** {@inheritDoc} */
@Override public void printMemoryStats() {
X.println(">>>");
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a0f7f93..1681f2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1130,6 +1130,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
for (GridCacheContext cacheCtx : cctx.cacheContexts())
cacheCtx.preloader().dumpDebugInfo();
+
+ // Dump IO manager statistics.
+ cctx.gridIO().dumpStats();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
index 685d260..409bded 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioRecoveryDescriptor.java
@@ -44,6 +44,9 @@ public class GridNioRecoveryDescriptor {
/** Number of received messages. */
private long rcvCnt;
+ /** Number of sent messages. */
+ private long sentCnt;
+
/** Reserved flag. */
private boolean reserved;
@@ -120,6 +123,13 @@ public class GridNioRecoveryDescriptor {
}
/**
+ * @return Number of sent messages.
+ */
+ public long sent() {
+ return sentCnt;
+ }
+
+ /**
* @param lastAck Last acknowledged message.
*/
public void lastAcknowledged(long lastAck) {
@@ -151,6 +161,8 @@ public class GridNioRecoveryDescriptor {
if (resendCnt == 0) {
msgFuts.addLast(fut);
+ sentCnt++;
+
return msgFuts.size() < queueLimit;
}
else
@@ -187,6 +199,13 @@ public class GridNioRecoveryDescriptor {
}
/**
+ * @return Last acked message by remote node.
+ */
+ public long acked() {
+ return acked;
+ }
+
+ /**
* Node left callback.
*
* @return {@code False} if descriptor is reserved.
@@ -379,4 +398,4 @@ public class GridNioRecoveryDescriptor {
@Override public String toString() {
return S.toString(GridNioRecoveryDescriptor.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 765b139..42c7ac7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -551,6 +551,14 @@ public class GridNioServer<T> {
}
/**
+ *
+ */
+ public void dumpStats() {
+ for (int i = 0; i < clientWorkers.size(); i++)
+ clientWorkers.get(i).offer(new NioOperationFuture<Void>(null, NioOperation.DUMP_STATS));
+ }
+
+ /**
* Establishes a session.
*
* @param ch Channel to register within the server and create session for.
@@ -1433,6 +1441,43 @@ public class GridNioServer<T> {
break;
}
+
+ case DUMP_STATS: {
+ StringBuilder sb = new StringBuilder();
+
+ Set<SelectionKey> keys = selector.keys();
+
+ sb.append(U.nl())
+ .append(">> Selector info [idx=").append(idx)
+ .append(", keysCnt=").append(keys.size())
+ .append("]").append(U.nl());
+
+ for (SelectionKey key : keys) {
+ GridSelectorNioSessionImpl ses = (GridSelectorNioSessionImpl)key.attachment();
+
+ sb.append(" Conn [")
+ .append("rmtAddr=").append(ses.remoteAddress())
+ .append(", locAddr=").append(ses.localAddress())
+ .append(", bytesRcvd=").append(ses.bytesReceived())
+ .append(", bytesSent=").append(ses.bytesSent());
+
+ if (ses.recoveryDescriptor() != null) {
+ sb.append(", msgsSent=").append(ses.recoveryDescriptor().sent())
+ .append(", msgsAckedByRmt=").append(ses.recoveryDescriptor().acked())
+ .append(", msgsRcvd=").append(ses.recoveryDescriptor().received());
+ }
+ else
+ sb.append(", recoveryDesc=null");
+
+ sb.append("]").append(U.nl());
+ }
+
+ if (log.isInfoEnabled())
+ log.info(sb.toString());
+
+ // Complete the request just in case (none should wait on this future).
+ req.onDone(true);
+ }
}
}
@@ -1991,7 +2036,10 @@ public class GridNioServer<T> {
PAUSE_READ,
/** Resume read. */
- RESUME_READ
+ RESUME_READ,
+
+ /** Dump statistics. */
+ DUMP_STATS
}
/**
@@ -2059,7 +2107,7 @@ public class GridNioServer<T> {
* @param op Requested operation.
*/
NioOperationFuture(GridSelectorNioSessionImpl ses, NioOperation op) {
- assert ses != null;
+ assert ses != null || op == NioOperation.DUMP_STATS : "Invalid params [ses=" + ses + ", op=" + op + ']';
assert op != null;
assert op != NioOperation.REGISTER;
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2a078ee..b283b82 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -1380,6 +1380,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
}
/** {@inheritDoc} */
+ @Override public void dumpStats() {
+ GridNioServer<Message> nioSrvr1 = nioSrvr;
+
+ if (nioSrvr1 != null)
+ nioSrvr1.dumpStats();
+ }
+
+ /** {@inheritDoc} */
@Override public Map<String, Object> getNodeAttributes() throws IgniteSpiException {
initFailureDetectionTimeout();
http://git-wip-us.apache.org/repos/asf/ignite/blob/0b10e0c8/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
index a785482..482e2ef 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpiMBean.java
@@ -287,4 +287,10 @@ public interface TcpCommunicationSpiMBean extends IgniteSpiManagementMBean {
*/
@MXBeanDescription("Slow client queue limit.")
public int getSlowClientQueueLimit();
-}
\ No newline at end of file
+
+ /**
+ * Dumps SPI per-connection stats to logs.
+ */
+ @MXBeanDescription("Dumps SPI statistics to logs.")
+ public void dumpStats();
+}