You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/06/08 12:19:42 UTC
[01/11] ignite git commit: ignite-3212 More debug logging on exchange
timeout. Fixed issue in GridCacheTxRecoveryFuture with message send failure
and late discovery event.
Repository: ignite
Updated Branches:
refs/heads/master 77793f5e2 -> 61210943c
ignite-3212 More debug logging on exchange timeout. Fixed issue in GridCacheTxRecoveryFuture with message send failure and late discovery event.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4e82af8a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4e82af8a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4e82af8a
Branch: refs/heads/master
Commit: 4e82af8a57d7a627e4d019273f8c5a40574694d3
Parents: 4799b26
Author: sboikov <sb...@gridgain.com>
Authored: Mon Jun 6 14:23:02 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Jun 6 14:23:02 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../GridCachePartitionExchangeManager.java | 43 ++++++-
.../distributed/GridCacheTxRecoveryFuture.java | 58 ++++++---
.../GridDhtPartitionsExchangeFuture.java | 42 +++++-
.../cache/transactions/IgniteTxManager.java | 9 +-
.../IgniteCacheMessageWriteTimeoutTest.java | 129 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
7 files changed, 253 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5f761a2..d3ba9d9 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -377,6 +377,9 @@ public final class IgniteSystemProperties {
/** Number of times pending cache objects will be dumped to the log in case of partition exchange timeout. */
public static final String IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD = "IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD";
+ /** If this property is set to {@code true} then Ignite will log thread dump in case of partition exchange timeout. */
+ public static final String IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT = "IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT";
+
/** JDBC driver cursor remove delay. */
public static final String IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY = "IGNITE_JDBC_DRIVER_CURSOR_RMV_DELAY";
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 78a8d6c..260a504 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -33,12 +33,14 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
@@ -90,6 +92,7 @@ import org.jsr166.ConcurrentLinkedDeque8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -1106,16 +1109,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- *
+ * @throws Exception If failed.
*/
- public void dumpDebugInfo() {
+ public void dumpDebugInfo() throws Exception {
dumpDebugInfo(null);
}
/**
- *
+ * @param exchTopVer Optional current exchange topology version.
+ * @throws Exception If failed.
*/
- public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) {
+ public void dumpDebugInfo(@Nullable AffinityTopologyVersion exchTopVer) throws Exception {
U.warn(log, "Ready affinity version: " + readyTopVer.get());
U.warn(log, "Last exchange future: " + lastInitializedFut);
@@ -1332,7 +1336,36 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut.init();
- exchFut.get();
+ int dumpedObjects = 0;
+
+ while (true) {
+ try {
+ exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Failed to wait for partition map exchange [" +
+ "topVer=" + exchFut.topologyVersion() +
+ ", node=" + cctx.localNodeId() + "]. " +
+ "Dumping pending objects that might be the cause: ");
+
+ if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+ try {
+ dumpDebugInfo(exchFut.topologyVersion());
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to dump debug information: " + e, e);
+ }
+
+ if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+ U.dumpThreads(log);
+
+ dumpedObjects++;
+ }
+ }
+ }
+
if (log.isDebugEnabled())
log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index 5a4a1ef..01673b8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
@@ -32,6 +34,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.util.GridLeanMap;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
@@ -41,6 +44,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.transactions.TransactionState.PREPARED;
+
/**
* Future verifying that all remote transactions related to transaction were prepared or committed.
*/
@@ -69,8 +74,9 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
/** All involved nodes. */
private final Map<UUID, ClusterNode> nodes;
- /** ID of failed node started transaction. */
- private final UUID failedNodeId;
+ /** ID of failed nodes started transaction. */
+ @GridToStringInclude
+ private final Set<UUID> failedNodeIds;
/** Transaction nodes mapping. */
private final Map<UUID, Collection<UUID>> txNodes;
@@ -81,13 +87,13 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
/**
* @param cctx Context.
* @param tx Transaction.
- * @param failedNodeId ID of failed node started transaction.
+ * @param failedNodeIds IDs of failed nodes started transaction.
* @param txNodes Transaction mapping.
*/
@SuppressWarnings("ConstantConditions")
public GridCacheTxRecoveryFuture(GridCacheSharedContext<?, ?> cctx,
IgniteInternalTx tx,
- UUID failedNodeId,
+ Set<UUID> failedNodeIds,
Map<UUID, Collection<UUID>> txNodes)
{
super(CU.boolReducer());
@@ -95,7 +101,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
this.cctx = cctx;
this.tx = tx;
this.txNodes = txNodes;
- this.failedNodeId = failedNodeId;
+ this.failedNodeIds = failedNodeIds;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridCacheTxRecoveryFuture.class);
@@ -105,7 +111,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
UUID locNodeId = cctx.localNodeId();
for (Map.Entry<UUID, Collection<UUID>> e : tx.transactionNodes().entrySet()) {
- if (!locNodeId.equals(e.getKey()) && !failedNodeId.equals(e.getKey()) && !nodes.containsKey(e.getKey())) {
+ if (!locNodeId.equals(e.getKey()) && !failedNodeIds.contains(e.getKey()) && !nodes.containsKey(e.getKey())) {
ClusterNode node = cctx.discovery().node(e.getKey());
if (node != null)
@@ -115,7 +121,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
}
for (UUID nodeId : e.getValue()) {
- if (!locNodeId.equals(nodeId) && !failedNodeId.equals(nodeId) && !nodes.containsKey(nodeId)) {
+ if (!locNodeId.equals(nodeId) && !failedNodeIds.contains(nodeId) && !nodes.containsKey(nodeId)) {
ClusterNode node = cctx.discovery().node(nodeId);
if (node != null)
@@ -128,7 +134,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
UUID nearNodeId = tx.eventNodeId();
- nearTxCheck = !failedNodeId.equals(nearNodeId) && cctx.discovery().alive(nearNodeId);
+ nearTxCheck = !failedNodeIds.contains(nearNodeId) && cctx.discovery().alive(nearNodeId);
}
/**
@@ -170,7 +176,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
cctx.io().send(nearNodeId, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignore) {
- fut.onNodeLeft();
+ fut.onNodeLeft(nearNodeId);
}
catch (IgniteCheckedException e) {
fut.onError(e);
@@ -255,7 +261,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
* send message only to primary node.
*/
- if (nodeId.equals(failedNodeId)) {
+ if (failedNodeIds.contains(nodeId)) {
for (UUID id : entry.getValue()) {
// Skip backup node if it is local node or if it is also was mapped as primary.
if (txNodes.containsKey(id) || id.equals(cctx.localNodeId()))
@@ -276,7 +282,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
cctx.io().send(id, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
+ fut.onNodeLeft(id);
}
catch (IgniteCheckedException e) {
fut.onError(e);
@@ -302,7 +308,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
cctx.io().send(nodeId, req, tx.ioPolicy());
}
catch (ClusterTopologyCheckedException ignored) {
- fut.onNodeLeft();
+ fut.onNodeLeft(nodeId);
}
catch (IgniteCheckedException e) {
fut.onError(e);
@@ -388,14 +394,20 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
}
/** {@inheritDoc} */
- @Override public boolean onNodeLeft(UUID nodeId) {
- for (IgniteInternalFuture<?> fut : futures())
+ @Override public boolean onNodeLeft(final UUID nodeId) {
+ for (IgniteInternalFuture<?> fut : futures()) {
if (isMini(fut)) {
- MiniFuture f = (MiniFuture)fut;
+ final MiniFuture f = (MiniFuture)fut;
- if (f.nodeId().equals(nodeId))
- f.onNodeLeft();
+ if (f.nodeId().equals(nodeId)) {
+ cctx.kernalContext().closure().runLocalSafe(new Runnable() {
+ @Override public void run() {
+ f.onNodeLeft(nodeId);
+ }
+ });
+ }
}
+ }
return true;
}
@@ -507,14 +519,20 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
}
/**
+ * @param nodeId Failed node ID.
*/
- private void onNodeLeft() {
+ private void onNodeLeft(UUID nodeId) {
if (log.isDebugEnabled())
log.debug("Transaction node left grid (will ignore) [fut=" + this + ']');
if (nearTxCheck) {
- // Near and originating nodes left, need initiate tx check.
- cctx.tm().commitIfPrepared(tx);
+ if (tx.state() == PREPARED) {
+ Set<UUID> failedNodeIds0 = new HashSet<>(failedNodeIds);
+ failedNodeIds0.add(nodeId);
+
+ // Near and originating nodes left, need initiate tx check.
+ cctx.tm().commitIfPrepared(tx, failedNodeIds0);
+ }
onDone(new ClusterTopologyCheckedException("Transaction node left grid (will ignore)."));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index b497f58..bea1957 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -73,6 +73,7 @@ import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
@@ -85,7 +86,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
/** */
- private static final int DUMP_PENDING_OBJECTS_THRESHOLD =
+ public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
/** */
@@ -891,7 +892,15 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
U.warn(log, "Failed to wait for partition release future [topVer=" + topologyVersion() +
", node=" + cctx.localNodeId() + "]. Dumping pending objects that might be the cause: ");
- cctx.exchange().dumpDebugInfo(topologyVersion());
+ try {
+ cctx.exchange().dumpDebugInfo(topologyVersion());
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to dump debug information: " + e, e);
+ }
+
+ if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+ U.dumpThreads(log);
}
/**
@@ -1541,6 +1550,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
try {
boolean crdChanged = false;
boolean allReceived = false;
+ Set<UUID> reqFrom = null;
ClusterNode crd0;
@@ -1556,8 +1566,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
crd = srvNodes.size() > 0 ? srvNodes.get(0) : null;
}
- if (crd != null && crd.isLocal() && rmvd)
- allReceived = remaining.isEmpty();
+ if (crd != null && crd.isLocal()) {
+ if (rmvd)
+ allReceived = remaining.isEmpty();
+
+ if (crdChanged && !remaining.isEmpty())
+ reqFrom = new HashSet<>(remaining);
+ }
crd0 = crd;
}
@@ -1588,6 +1603,25 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
return;
}
+ if (crdChanged && reqFrom != null) {
+ GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
+
+ for (UUID nodeId : reqFrom) {
+ try {
+ // It is possible that some nodes finished exchange with previous coordinator.
+ cctx.io().send(nodeId, req, SYSTEM_POOL);
+ }
+ catch (ClusterTopologyCheckedException e) {
+ if (log.isDebugEnabled())
+ log.debug("Node left during partition exchange [nodeId=" + nodeId +
+ ", exchId=" + exchId + ']');
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to request partitions from node: " + nodeId, e);
+ }
+ }
+ }
+
for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet())
processMessage(m.getKey(), m.getValue());
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 789ef8d..4ec280f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1872,8 +1872,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
* transactions were prepared (invalidates transaction if it is not fully prepared).
*
* @param tx Transaction.
+ * @param failedNodeIds Failed nodes IDs.
*/
- public void commitIfPrepared(IgniteInternalTx tx) {
+ public void commitIfPrepared(IgniteInternalTx tx, Set<UUID> failedNodeIds) {
assert tx instanceof GridDhtTxLocal || tx instanceof GridDhtTxRemote : tx;
assert !F.isEmpty(tx.transactionNodes()) : tx;
assert tx.nearXidVersion() != null : tx;
@@ -1881,7 +1882,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
GridCacheTxRecoveryFuture fut = new GridCacheTxRecoveryFuture(
cctx,
tx,
- tx.originatingNodeId(),
+ failedNodeIds,
tx.transactionNodes());
cctx.mvcc().addFuture(fut, fut.futureId());
@@ -2147,7 +2148,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
// Check prepare only if originating node ID failed. Otherwise parent node will finish this tx.
if (tx.originatingNodeId().equals(evtNodeId)) {
if (tx.state() == PREPARED)
- commitIfPrepared(tx);
+ commitIfPrepared(tx, Collections.singleton(evtNodeId));
else {
IgniteInternalFuture<?> prepFut = tx.currentPrepareFuture();
@@ -2155,7 +2156,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter {
prepFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
if (tx.state() == PREPARED)
- commitIfPrepared(tx);
+ commitIfPrepared(tx, Collections.singleton(evtNodeId));
else if (tx.setRollbackOnly())
tx.rollbackAsync();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
new file mode 100644
index 0000000..6256225
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheMessageWriteTimeoutTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class IgniteCacheMessageWriteTimeoutTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ TcpCommunicationSpi commSpi = (TcpCommunicationSpi)cfg.getCommunicationSpi();
+
+ // Try provoke connection close on socket writeTimeout.
+ commSpi.setSharedMemoryPort(-1);
+ commSpi.setMessageQueueLimit(10);
+ commSpi.setSocketReceiveBuffer(32);
+ commSpi.setSocketSendBuffer(32);
+ commSpi.setSocketWriteTimeout(100);
+ commSpi.setUnacknowledgedMessagesBufferSize(1000);
+ commSpi.setConnectTimeout(10_000);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testMessageQueueLimit() throws Exception {
+ startGridsMultiThreaded(3);
+
+ for (int i = 0; i < 15; i++) {
+ log.info("Iteration: " + i);
+
+ IgniteInternalFuture<?> fut1 = startJobThreads(50);
+
+ U.sleep(100);
+
+ IgniteInternalFuture<?> fut2 = startJobThreads(50);
+
+ fut1.get();
+ fut2.get();
+ }
+ }
+
+ /**
+ * @param cnt Threads count.
+ * @return Future.
+ */
+ private IgniteInternalFuture<?> startJobThreads(int cnt) {
+ final CyclicBarrier b = new CyclicBarrier(cnt);
+
+ return GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
+ @Override public Void call() throws Exception {
+ int idx = b.await();
+
+ Ignite node = ignite(idx % 3);
+
+ IgniteCompute comp = node.compute(node.cluster().forRemotes());
+
+ comp.run(new TestJob());
+
+ return null;
+ }
+
+ }, cnt, "job-thread");
+ }
+
+ /**
+ *
+ */
+ static class TestJob implements IgniteRunnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ long stop = System.currentTimeMillis() + 1000;
+
+ while (System.currentTimeMillis() < stop)
+ assertTrue(Math.sqrt(hashCode()) >= 0);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4e82af8a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 3238786..1ad74a5 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -125,6 +125,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheAtomicNearUp
import org.apache.ignite.internal.processors.cache.distributed.CacheTxNearUpdateTopologyChangeTest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheEntrySetIterationPreloadingSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheAtomicMessageRecoveryTest;
+import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheMessageWriteTimeoutTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSystemTransactionsSelfTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheTxMessageRecoveryTest;
import org.apache.ignite.internal.processors.cache.distributed.IgniteCrossCacheTxStoreSelfTest;
@@ -276,6 +277,7 @@ public class IgniteCacheTestSuite extends TestSuite {
suite.addTestSuite(GridCacheMixedPartitionExchangeSelfTest.class);
suite.addTestSuite(IgniteCacheAtomicMessageRecoveryTest.class);
suite.addTestSuite(IgniteCacheTxMessageRecoveryTest.class);
+ suite.addTestSuite(IgniteCacheMessageWriteTimeoutTest.class);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionAtomicSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredEvictionSelfTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, GridCacheOffHeapTieredAtomicSelfTest.class, ignoredTests);
[07/11] ignite git commit: IGNITE-3260: IGFS: Delete messages are no
longer passed.
Posted by vo...@apache.org.
IGNITE-3260: IGFS: Delete messages are no longer passed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c300448b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c300448b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c300448b
Branch: refs/heads/master
Commit: c300448b94ed0d3f847197d1bbe67c31165c6ae6
Parents: a60bb3b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:12:42 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:57:39 2016 +0300
----------------------------------------------------------------------
.../internal/processors/igfs/IgfsAsyncImpl.java | 6 -
.../processors/igfs/IgfsDataManager.java | 61 ++---
.../processors/igfs/IgfsDeleteWorker.java | 39 ---
.../ignite/internal/processors/igfs/IgfsEx.java | 9 -
.../internal/processors/igfs/IgfsImpl.java | 249 +++++--------------
.../internal/processors/igfs/IgfsUtils.java | 2 +-
.../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 -
.../processors/igfs/IgfsSizeSelfTest.java | 133 ----------
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 -
9 files changed, 83 insertions(+), 424 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 8653f90..7530557 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.AsyncSupportAdapter;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -166,11 +165,6 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return igfs.awaitDeletesAsync();
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return igfs.clientLogDirectory();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 77fb0a9..66da05d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -33,7 +33,6 @@ import org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -1056,34 +1055,24 @@ public class IgfsDataManager extends IgfsManager {
private void processPartialBlockWrite(IgniteUuid fileId, IgfsBlockKey colocatedKey, int startOff,
byte[] data) throws IgniteCheckedException {
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
+ final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- final WriteCompletionFuture completionFut = pendingWrites.get(fileId);
-
- if (completionFut == null) {
- if (log.isDebugEnabled())
- log.debug("Missing completion future for file write request (most likely exception occurred " +
- "which will be thrown upon stream close) [fileId=" + fileId + ']');
+ if (completionFut == null) {
+ if (log.isDebugEnabled())
+ log.debug("Missing completion future for file write request (most likely exception occurred " +
+ "which will be thrown upon stream close) [fileId=" + fileId + ']');
- return;
- }
+ return;
+ }
- IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
- "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
+ IgfsOutOfSpaceException e = new IgfsOutOfSpaceException("Failed to write data block " +
+ "(IGFS maximum data size exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']');
- completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
- igfsCtx.kernalContext().localNodeId(), e));
+ completionFut.onDone(new IgniteCheckedException("Failed to write data (not enough space on node): " +
+ igfsCtx.kernalContext().localNodeId(), e));
- return;
- }
+ return;
}
// No affinity key present, just concat and return.
@@ -1225,26 +1214,10 @@ public class IgfsDataManager extends IgfsManager {
assert !blocks.isEmpty();
if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax()) {
- try {
- try {
- igfs.awaitDeletesAsync().get(trashPurgeTimeout);
- }
- catch (IgniteFutureTimeoutCheckedException ignore) {
- // Ignore.
- }
-
- // Additional size check.
- if (dataCachePrj.igfsDataSpaceUsed() >= dataCachePrj.igfsDataSpaceMax())
- return new GridFinishedFuture<Object>(
- new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
- "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
- ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
-
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(new IgniteCheckedException("Failed to store data " +
- "block due to unexpected exception.", e));
- }
+ return new GridFinishedFuture<Object>(
+ new IgfsOutOfSpaceException("Failed to write data block (IGFS maximum data size " +
+ "exceeded) [used=" + dataCachePrj.igfsDataSpaceUsed() +
+ ", allowed=" + dataCachePrj.igfsDataSpaceMax() + ']'));
}
return dataCachePrj.putAllAsync(blocks);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index bac7569..3d41071 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -24,9 +24,7 @@ import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
-import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -51,9 +49,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
- /** IGFS context. */
- private final IgfsContext igfsCtx;
-
/** Metadata manager. */
private final IgfsMetaManager meta;
@@ -75,9 +70,6 @@ public class IgfsDeleteWorker extends IgfsThread {
/** Cancellation flag. */
private volatile boolean cancelled;
- /** Message topic. */
- private Object topic;
-
/**
* Constructor.
*
@@ -86,15 +78,9 @@ public class IgfsDeleteWorker extends IgfsThread {
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
- this.igfsCtx = igfsCtx;
-
meta = igfsCtx.meta();
data = igfsCtx.data();
- String igfsName = igfsCtx.igfs().name();
-
- topic = F.isEmpty(igfsName) ? TOPIC_IGFS : TOPIC_IGFS.topic(igfsName);
-
assert meta != null;
assert data != null;
@@ -191,8 +177,6 @@ public class IgfsDeleteWorker extends IgfsThread {
if (log.isDebugEnabled())
log.debug("Sending delete confirmation message [name=" + entry.getKey() +
", fileId=" + fileId + ']');
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId));
}
}
else
@@ -203,8 +187,6 @@ public class IgfsDeleteWorker extends IgfsThread {
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to delete entry from the trash directory: " + entry.getKey(), e);
-
- sendDeleteMessage(new IgfsDeleteMessage(fileId, e));
}
}
}
@@ -358,25 +340,4 @@ public class IgfsDeleteWorker extends IgfsThread {
return true; // Directory entry was deleted concurrently.
}
}
-
- /**
- * Send delete message to all meta cache nodes in the grid.
- *
- * @param msg Message to send.
- */
- private void sendDeleteMessage(IgfsDeleteMessage msg) {
- assert msg != null;
-
- Collection<ClusterNode> nodes = meta.metaCacheNodes();
-
- for (ClusterNode node : nodes) {
- try {
- igfsCtx.send(node, topic, msg, GridIoPolicy.IGFS_POOL);
- }
- catch (IgniteCheckedException e) {
- U.warn(log, "Failed to send IGFS delete message to node [nodeId=" + node.id() +
- ", msg=" + msg + ", err=" + e.getMessage() + ']');
- }
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
index fb67e20..4c64bc9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsEx.java
@@ -23,7 +23,6 @@ import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
@@ -100,14 +99,6 @@ public interface IgfsEx extends IgniteFileSystem {
public long groupBlockSize();
/**
- * Asynchronously await for all entries existing in trash to be removed.
- *
- * @return Future which will be completed when all entries existed in trash by the time of invocation are removed.
- * @throws IgniteCheckedException If failed.
- */
- public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException;
-
- /**
* Gets client file system log directory.
*
* @return Client file system log directory or {@code null} in case no client connections have been created yet.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index b2b031e..4dfb040 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -32,10 +32,9 @@ import org.apache.ignite.compute.ComputeJobResultPolicy;
import org.apache.ignite.compute.ComputeTaskSplitAdapter;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
-import org.apache.ignite.events.DiscoveryEvent;
-import org.apache.ignite.events.Event;
import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsFile;
import org.apache.ignite.igfs.IgfsInvalidPathException;
import org.apache.ignite.igfs.IgfsMetrics;
@@ -51,9 +50,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
-import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.hadoop.HadoopPayloadAware;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientAffinityCallable;
import org.apache.ignite.internal.processors.igfs.client.IgfsClientDeleteCallable;
@@ -69,8 +66,6 @@ import org.apache.ignite.internal.processors.igfs.client.IgfsClientSummaryCallab
import org.apache.ignite.internal.processors.igfs.client.IgfsClientUpdateCallable;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
@@ -100,11 +95,11 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.ignite.events.EventType.EVT_IGFS_DIR_DELETED;
@@ -114,14 +109,10 @@ import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_DELETED;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_READ;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_OPENED_WRITE;
import static org.apache.ignite.events.EventType.EVT_IGFS_META_UPDATED;
-import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
-import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.igfs.IgfsMode.DUAL_ASYNC;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGFS;
/**
* Cache-based IGFS implementation.
@@ -130,6 +121,9 @@ public final class IgfsImpl implements IgfsEx {
/** Default permissions for file system entry. */
private static final String PERMISSION_DFLT_VAL = "0777";
+ /** Index generator for async format threads. */
+ private static final AtomicInteger FORMAT_THREAD_IDX_GEN = new AtomicInteger();
+
/** Default directory metadata. */
static final Map<String, String> DFLT_DIR_META = F.asMap(IgfsUtils.PROP_PERMISSION, PERMISSION_DFLT_VAL);
@@ -169,24 +163,12 @@ public final class IgfsImpl implements IgfsEx {
/** Writers map. */
private final ConcurrentHashMap8<IgfsPath, IgfsFileWorkerBatch> workerMap = new ConcurrentHashMap8<>();
- /** Delete futures. */
- private final ConcurrentHashMap8<IgniteUuid, GridFutureAdapter<Object>> delFuts = new ConcurrentHashMap8<>();
-
- /** Delete message listener. */
- private final GridMessageListener delMsgLsnr = new FormatMessageListener();
-
- /** Format discovery listener. */
- private final GridLocalEventListener delDiscoLsnr = new FormatDiscoveryListener();
-
/** Local metrics holder. */
private final IgfsLocalMetrics metrics = new IgfsLocalMetrics();
/** Client log directory. */
private volatile String logDir;
- /** Message topic. */
- private Object topic;
-
/** Eviction policy (if set). */
private IgfsPerBlockLruEvictionPolicy evictPlc;
@@ -292,11 +274,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- topic = F.isEmpty(name()) ? TOPIC_IGFS : TOPIC_IGFS.topic(name());
-
- igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED);
-
dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L,
new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null;
}
@@ -332,9 +309,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- igfsCtx.kernalContext().io().removeMessageListener(topic, delMsgLsnr);
- igfsCtx.kernalContext().event().removeLocalEventListener(delDiscoLsnr);
-
// Restore interrupted flag.
if (interrupted)
Thread.currentThread().interrupt();
@@ -1381,7 +1355,25 @@ public final class IgfsImpl implements IgfsEx {
/** {@inheritDoc} */
@Override public void format() {
try {
- formatAsync().get();
+ IgniteUuid id = meta.format();
+
+ // If ID is null, then file system is already empty.
+ if (id == null)
+ return;
+
+ while (true) {
+ if (enterBusy()) {
+ try {
+ if (!meta.exists(id))
+ return;
+ }
+ finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ U.sleep(10);
+ }
}
catch (Exception e) {
throw IgfsUtils.toIgfsException(e);
@@ -1394,69 +1386,16 @@ public final class IgfsImpl implements IgfsEx {
* @return Future.
*/
IgniteInternalFuture<?> formatAsync() {
- try {
- IgniteUuid id = meta.format();
-
- if (id == null)
- return new GridFinishedFuture<Object>();
- else {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- GridFutureAdapter<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- return oldFut;
- else {
- if (!meta.exists(id)) {
- // Safety in case response message was received before we put future into collection.
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
+ GridFutureAdapter<?> fut = new GridFutureAdapter<>();
- return fut;
- }
- }
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<Object>(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- Collection<IgniteUuid> ids = meta.pendingDeletes();
+ Thread t = new Thread(new FormatRunnable(fut), "igfs-format-" + cfg.getName() + "-" +
+ FORMAT_THREAD_IDX_GEN.incrementAndGet());
- if (!ids.isEmpty()) {
- if (log.isDebugEnabled())
- log.debug("Constructing delete future for trash entries: " + ids);
+ t.setDaemon(true);
- GridCompoundFuture<Object, Object> resFut = new GridCompoundFuture<>();
+ t.start();
- for (IgniteUuid id : ids) {
- GridFutureAdapter<Object> fut = new GridFutureAdapter<>();
-
- IgniteInternalFuture<Object> oldFut = delFuts.putIfAbsent(id, fut);
-
- if (oldFut != null)
- resFut.add(oldFut);
- else {
- if (meta.exists(id))
- resFut.add(fut);
- else {
- fut.onDone();
-
- delFuts.remove(id, fut);
- }
- }
- }
-
- resFut.markInitialized();
-
- return resFut;
- }
- else
- return new GridFinishedFuture<>();
+ return fut;
}
/**
@@ -1482,24 +1421,6 @@ public final class IgfsImpl implements IgfsEx {
return new FileDescriptor(parentId, path.name(), fileInfo.id(), fileInfo.isFile());
}
- /**
- * Check whether IGFS with the same name exists among provided attributes.
- *
- * @param attrs Attributes.
- * @return {@code True} in case IGFS with the same name exists among provided attributes
- */
- private boolean sameIgfs(IgfsAttributes[] attrs) {
- if (attrs != null) {
- String igfsName = name();
-
- for (IgfsAttributes attr : attrs) {
- if (F.eq(igfsName, attr.igfsName()))
- return true;
- }
- }
- return false;
- }
-
/** {@inheritDoc} */
@Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
Collection<IgfsPath> paths, @Nullable T arg) {
@@ -1905,81 +1826,6 @@ public final class IgfsImpl implements IgfsEx {
}
}
- /**
- * Format message listener required for format action completion.
- */
- private class FormatMessageListener implements GridMessageListener {
- /** {@inheritDoc} */
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override public void onMessage(UUID nodeId, Object msg) {
- if (msg instanceof IgfsDeleteMessage) {
- ClusterNode node = igfsCtx.kernalContext().discovery().node(nodeId);
-
- if (node != null) {
- if (sameIgfs((IgfsAttributes[]) node.attribute(ATTR_IGFS))) {
- IgfsDeleteMessage msg0 = (IgfsDeleteMessage)msg;
-
- try {
- msg0.finishUnmarshal(igfsCtx.kernalContext().config().getMarshaller(), null);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to unmarshal message (will ignore): " + msg0, e);
-
- return;
- }
-
- assert msg0.id() != null;
-
- GridFutureAdapter<?> fut = delFuts.remove(msg0.id());
-
- if (fut != null) {
- if (msg0.error() == null)
- fut.onDone();
- else
- fut.onDone(msg0.error());
- }
- }
- }
- }
- }
- }
-
- /**
- * Discovery listener required for format actions completion.
- */
- private class FormatDiscoveryListener implements GridLocalEventListener {
- /** {@inheritDoc} */
- @Override public void onEvent(Event evt) {
- assert evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED;
-
- DiscoveryEvent evt0 = (DiscoveryEvent)evt;
-
- if (evt0.eventNode() != null) {
- if (sameIgfs((IgfsAttributes[]) evt0.eventNode().attribute(ATTR_IGFS))) {
- Collection<IgniteUuid> rmv = new HashSet<>();
-
- for (Map.Entry<IgniteUuid, GridFutureAdapter<Object>> fut : delFuts.entrySet()) {
- IgniteUuid id = fut.getKey();
-
- try {
- if (!meta.exists(id)) {
- fut.getValue().onDone();
-
- rmv.add(id);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to check file existence: " + id, e);
- }
- }
-
- for (IgniteUuid id : rmv)
- delFuts.remove(id);
- }
- }
- }
- }
-
/** {@inheritDoc} */
@Override public IgniteUuid nextAffinityKey() {
return safeOp(new Callable<IgniteUuid>() {
@@ -2079,4 +1925,39 @@ public final class IgfsImpl implements IgfsEx {
return t;
}
}
+
+ /**
+ * Format runnable.
+ */
+ private class FormatRunnable implements Runnable {
+ /** Target future. */
+ private final GridFutureAdapter<?> fut;
+
+ /**
+ * Constructor.
+ *
+ * @param fut Future.
+ */
+ public FormatRunnable(GridFutureAdapter<?> fut) {
+ this.fut = fut;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgfsException err = null;
+
+ try {
+ format();
+ }
+ catch (Throwable err0) {
+ err = IgfsUtils.toIgfsException(err0);
+ }
+ finally {
+ if (err == null)
+ fut.onDone();
+ else
+ fut.onDone(err);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index d7aae5c..7310b4c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -185,7 +185,7 @@ public class IgfsUtils {
* @return Converted IGFS exception.
*/
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- public static IgfsException toIgfsException(Exception err) {
+ public static IgfsException toIgfsException(Throwable err) {
IgfsException err0 = err instanceof IgfsException ? (IgfsException)err : null;
IgfsException igfsErr = X.cause(err, IgfsException.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
index fd4ec17..4e0f12b 100644
--- a/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/igfs/IgfsFragmentizerSelfTest.java
@@ -239,8 +239,6 @@ public class IgfsFragmentizerSelfTest extends IgfsFragmentizerAbstractSelfTest {
igfs.format();
- igfs.awaitDeletesAsync().get();
-
GridTestUtils.retryAssert(log, 50, 100, new CA() {
@Override public void apply() {
for (int i = 0; i < NODE_CNT; i++) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
index 3933e86..266945f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSizeSelfTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
@@ -41,27 +40,21 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
-import org.apache.ignite.transactions.Transaction;
import org.jsr166.ThreadLocalRandom8;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cache.CacheRebalanceMode.SYNC;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* {@link IgfsAttributes} test case.
@@ -256,41 +249,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown in case PARTITIONED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testPartitionedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = true;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case co-located cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testColocatedOversizeDelay() throws Exception {
- cacheMode = PARTITIONED;
- nearEnabled = false;
-
- checkOversizeDelay();
- }
-
- /**
- * Ensure that exception is not thrown in case REPLICATED cache is oversized, but data is deleted concurrently.
- *
- * @throws Exception If failed.
- */
- public void testReplicatedOversizeDelay() throws Exception {
- cacheMode = REPLICATED;
-
- checkOversizeDelay();
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading for PARTITIONED cache.
*
* @throws Exception If failed.
@@ -484,97 +442,6 @@ public class IgfsSizeSelfTest extends IgfsCommonAbstractTest {
}
/**
- * Ensure that exception is not thrown or thrown with some delay when there is something in trash directory.
- *
- * @throws Exception If failed.
- */
- private void checkOversizeDelay() throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
-
- igfsMaxData = 256;
- trashPurgeTimeout = 2000;
-
- startUp();
-
- IgfsImpl igfs = igfs(0);
-
- final IgfsPath path = new IgfsPath("/file");
- final IgfsPath otherPath = new IgfsPath("/fileOther");
-
- // Fill cache with data up to it's limit.
- IgfsOutputStream os = igfs.create(path, false);
- os.write(chunk((int)igfsMaxData));
- os.close();
-
- final IgniteCache<IgniteUuid, IgfsEntryInfo> metaCache = igfs.context().kernalContext().cache().jcache(
- igfs.configuration().getMetaCacheName());
-
- // Start a transaction in a separate thread which will lock file ID.
- final IgniteUuid id = igfs.context().meta().fileId(path);
- final IgfsEntryInfo info = igfs.context().meta().info(id);
-
- final AtomicReference<Throwable> err = new AtomicReference<>();
-
- try {
- new Thread(new Runnable() {
- @Override public void run() {
- try {
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- metaCache.get(id);
-
- latch.await();
-
- U.sleep(1000); // Sleep here so that data manager could "see" oversize.
-
- tx.commit();
- }
- }
- catch (Throwable e) {
- err.set(e);
- }
- }
- }).start();
-
- // Now add file ID to trash listing so that delete worker could "see" it.
- IgniteUuid trashId = IgfsUtils.randomTrashId();
-
- try (Transaction tx = metaCache.unwrap(Ignite.class).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
- Map<String, IgfsListingEntry> listing = Collections.singletonMap(path.name(),
- new IgfsListingEntry(info));
-
- // Clear root listing.
- metaCache.put(IgfsUtils.ROOT_ID, IgfsUtils.createDirectory(IgfsUtils.ROOT_ID));
-
- // Add file to trash listing.
- IgfsEntryInfo trashInfo = metaCache.get(trashId);
-
- if (trashInfo == null)
- metaCache.put(trashId, IgfsUtils.createDirectory(trashId).listing(listing));
- else
- metaCache.put(trashId, trashInfo.listing(listing));
-
- tx.commit();
- }
-
- assert metaCache.get(trashId) != null;
-
- // Now the file is locked and is located in trash, try adding some more data.
- os = igfs.create(otherPath, false);
- os.write(new byte[1]);
-
- latch.countDown();
-
- os.close();
-
- assert err.get() == null;
- }
- finally {
- latch.countDown(); // Safety.
- }
- }
-
- /**
* Ensure that IGFS size is correctly updated in case of preloading.
*
* @throws Exception If failed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c300448b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
index b38f3a2..ffa6f7d 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultMapReducePlannerSelfTest.java
@@ -41,7 +41,6 @@ import org.apache.ignite.igfs.mapreduce.IgfsTask;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.IgniteClusterEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -754,11 +753,6 @@ public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTes
}
/** {@inheritDoc} */
- @Override public IgniteInternalFuture<?> awaitDeletesAsync() throws IgniteCheckedException {
- return null;
- }
-
- /** {@inheritDoc} */
@Nullable @Override public String clientLogDirectory() {
return null;
}
[10/11] ignite git commit: IGNITE-3264: IGFS: Reworked output stream.
Posted by vo...@apache.org.
IGNITE-3264: IGFS: Reworked output stream.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7761e5f5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7761e5f5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7761e5f5
Branch: refs/heads/master
Commit: 7761e5f573d9f39b5cc542f3d67bacab4e66609a
Parents: 177ebd5
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 8 15:13:33 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 15:13:33 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/igfs/IgfsDeleteWorker.java | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/7761e5f5/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
index 3d41071..8dcccea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDeleteWorker.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.igfs.IgfsPath;
import org.apache.ignite.internal.IgniteFutureCancelledCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
@@ -36,7 +35,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import static org.apache.ignite.internal.GridTopic.TOPIC_IGFS;
import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_PURGED;
/**
@@ -49,6 +47,9 @@ public class IgfsDeleteWorker extends IgfsThread {
/** How many files/folders to delete at once (i.e in a single transaction). */
private static final int MAX_DELETE_BATCH = 100;
+ /** IGFS context. */
+ private final IgfsContext igfsCtx;
+
/** Metadata manager. */
private final IgfsMetaManager meta;
@@ -78,6 +79,8 @@ public class IgfsDeleteWorker extends IgfsThread {
IgfsDeleteWorker(IgfsContext igfsCtx) {
super("igfs-delete-worker%" + igfsCtx.igfs().name() + "%" + igfsCtx.kernalContext().localNodeId() + "%");
+ this.igfsCtx = igfsCtx;
+
meta = igfsCtx.meta();
data = igfsCtx.data();
[09/11] ignite git commit: IGNITE-3264: IGFS: Reworked output stream.
Posted by vo...@apache.org.
IGNITE-3264: IGFS: Reworked output stream.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/177ebd5c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/177ebd5c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/177ebd5c
Branch: refs/heads/master
Commit: 177ebd5c75b38e772e8889fa3589d86ab3fa8f70
Parents: 4273950
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 8 14:41:49 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 15:12:53 2016 +0300
----------------------------------------------------------------------
.../configuration/FileSystemConfiguration.java | 34 ++
.../processors/igfs/IgfsDataManager.java | 28 +-
.../internal/processors/igfs/IgfsImpl.java | 45 +-
.../processors/igfs/IgfsMetaManager.java | 260 +++++----
.../igfs/IgfsOutputStreamAdapter.java | 265 ---------
.../processors/igfs/IgfsOutputStreamImpl.java | 558 +++++++++++--------
.../igfs/meta/IgfsMetaFileUnlockProcessor.java | 69 ++-
.../processors/igfs/IgfsAbstractSelfTest.java | 12 +-
.../igfs/IgfsDataManagerSelfTest.java | 12 +-
.../igfs/IgfsProcessorValidationSelfTest.java | 38 +-
.../processors/igfs/IgfsTaskSelfTest.java | 2 +-
11 files changed, 642 insertions(+), 681 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index fa15670..28dd611 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.ignite.configuration;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.igfs.IgfsIpcEndpointConfiguration;
import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -91,6 +92,9 @@ public class FileSystemConfiguration {
/** Default value of relaxed consistency flag. */
public static final boolean DFLT_RELAXED_CONSISTENCY = true;
+ /** Default value of update file length on flush flag. */
+ public static final boolean DFLT_UPDATE_FILE_LEN_ON_FLUSH = false;
+
/** IGFS instance name. */
private String name;
@@ -178,6 +182,9 @@ public class FileSystemConfiguration {
/** Relaxed consistency flag. */
private boolean relaxedConsistency = DFLT_RELAXED_CONSISTENCY;
+ /** Update file length on flush flag. */
+ private boolean updateFileLenOnFlush = DFLT_UPDATE_FILE_LEN_ON_FLUSH;
+
/**
* Constructs default configuration.
*/
@@ -225,6 +232,7 @@ public class FileSystemConfiguration {
relaxedConsistency = cfg.isRelaxedConsistency();
seqReadsBeforePrefetch = cfg.getSequentialReadsBeforePrefetch();
trashPurgeTimeout = cfg.getTrashPurgeTimeout();
+ updateFileLenOnFlush = cfg.isUpdateFileLengthOnFlush();
}
/**
@@ -922,6 +930,32 @@ public class FileSystemConfiguration {
this.relaxedConsistency = relaxedConsistency;
}
+ /**
+ * Get whether to update file length on flush.
+ * <p>
+ * Controls whether to update file length or not when {@link IgfsOutputStream#flush()} method is invoked. You
+ * may want to set this property to true in case you want to read from a file which is being written at the
+ * same time.
+ * <p>
+ * Defaults to {@link #DFLT_UPDATE_FILE_LEN_ON_FLUSH}.
+ *
+ * @return Whether to update file length on flush.
+ */
+ public boolean isUpdateFileLengthOnFlush() {
+ return updateFileLenOnFlush;
+ }
+
+ /**
+ * Set whether to update file length on flush.
+ * <p>
+ * Set {@link #isUpdateFileLengthOnFlush()} for more information.
+ *
+ * @param updateFileLenOnFlush Whether to update file length on flush.
+ */
+ public void setUpdateFileLengthOnFlush(boolean updateFileLenOnFlush) {
+ this.updateFileLenOnFlush = updateFileLenOnFlush;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(FileSystemConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index 66da05d..03777c3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -473,18 +473,18 @@ public class IgfsDataManager extends IgfsManager {
/**
* Registers write future in igfs data manager.
*
- * @param fileInfo File info of file opened to write.
+ * @param fileId File ID.
* @return Future that will be completed when all ack messages are received or when write failed.
*/
- public IgniteInternalFuture<Boolean> writeStart(IgfsEntryInfo fileInfo) {
- WriteCompletionFuture fut = new WriteCompletionFuture(fileInfo.id());
+ public IgniteInternalFuture<Boolean> writeStart(IgniteUuid fileId) {
+ WriteCompletionFuture fut = new WriteCompletionFuture(fileId);
- WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileInfo.id(), fut);
+ WriteCompletionFuture oldFut = pendingWrites.putIfAbsent(fileId, fut);
- assert oldFut == null : "Opened write that is being concurrently written: " + fileInfo;
+ assert oldFut == null : "Opened write that is being concurrently written: " + fileId;
if (log.isDebugEnabled())
- log.debug("Registered write completion future for file output stream [fileInfo=" + fileInfo +
+ log.debug("Registered write completion future for file output stream [fileId=" + fileId +
", fut=" + fut + ']');
return fut;
@@ -493,17 +493,23 @@ public class IgfsDataManager extends IgfsManager {
/**
* Notifies data manager that no further writes will be performed on stream.
*
- * @param fileInfo File info being written.
+ * @param fileId File ID.
+ * @param await Await completion.
+ * @throws IgniteCheckedException If failed.
*/
- public void writeClose(IgfsEntryInfo fileInfo) {
- WriteCompletionFuture fut = pendingWrites.get(fileInfo.id());
+ public void writeClose(IgniteUuid fileId, boolean await) throws IgniteCheckedException {
+ WriteCompletionFuture fut = pendingWrites.get(fileId);
- if (fut != null)
+ if (fut != null) {
fut.markWaitingLastAck();
+
+ if (await)
+ fut.get();
+ }
else {
if (log.isDebugEnabled())
log.debug("Failed to find write completion future for file in pending write map (most likely it was " +
- "failed): " + fileInfo);
+ "failed): " + fileId);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 4dfb040..eabec7c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1051,7 +1051,7 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
- IgfsEventAwareOutputStream os = new IgfsEventAwareOutputStream(path, desc.info(),
+ IgfsOutputStreamImpl os = new IgfsOutputStreamImpl(igfsCtx, path, desc.info(),
bufferSize(bufSize), mode, batch);
IgfsUtils.sendEvents(igfsCtx.kernalContext(), path, EVT_IGFS_FILE_OPENED_WRITE);
@@ -1081,7 +1081,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
}
});
}
@@ -1116,7 +1116,7 @@ public final class IgfsImpl implements IgfsEx {
batch = newBatch(path, desc.out());
- return new IgfsEventAwareOutputStream(path, desc.info(), bufferSize(bufSize), mode, batch);
+ return new IgfsOutputStreamImpl(igfsCtx, path, desc.info(), bufferSize(bufSize), mode, batch);
}
final List<IgniteUuid> ids = meta.idsForPath(path);
@@ -1157,7 +1157,7 @@ public final class IgfsImpl implements IgfsEx {
assert res != null;
- return new IgfsEventAwareOutputStream(path, res, bufferSize(bufSize), mode, null);
+ return new IgfsOutputStreamImpl(igfsCtx, path, res, bufferSize(bufSize), mode, null);
}
});
}
@@ -1680,43 +1680,6 @@ public final class IgfsImpl implements IgfsEx {
}
/**
- * IGFS output stream extension that fires events.
- */
- private class IgfsEventAwareOutputStream extends IgfsOutputStreamImpl {
- /** Close guard. */
- private final AtomicBoolean closeGuard = new AtomicBoolean(false);
-
- /**
- * Constructs file output stream.
- *
- * @param path Path to stored file.
- * @param fileInfo File info.
- * @param bufSize The size of the buffer to be used.
- * @param mode IGFS mode.
- * @param batch Optional secondary file system batch.
- */
- IgfsEventAwareOutputStream(IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
- @Nullable IgfsFileWorkerBatch batch) {
- super(igfsCtx, path, fileInfo, bufSize, mode, batch, metrics);
-
- metrics.incrementFilesOpenedForWrite();
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("NonSynchronizedMethodOverridesSynchronizedMethod")
- @Override protected void onClose() throws IOException {
- if (closeGuard.compareAndSet(false, true)) {
- super.onClose();
-
- metrics.decrementFilesOpenedForWrite();
-
- if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
- evts.record(new IgfsEvent(path, localNode(), EVT_IGFS_FILE_CLOSED_WRITE, bytes()));
- }
- }
- }
-
- /**
* IGFS input stream extension that fires events.
*/
private class IgfsEventAwareInputStream extends IgfsInputStreamImpl {
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index e1a181d..04e2139 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -281,24 +280,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Return nodes where meta cache is defined.
- *
- * @return Nodes where meta cache is defined.
- */
- Collection<ClusterNode> metaCacheNodes() {
- if (busyLock.enterBusy()) {
- try {
- return igfsCtx.kernalContext().discovery().cacheNodes(metaCache.name(), AffinityTopologyVersion.NONE);
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get meta cache nodes because Grid is stopping.");
- }
-
- /**
* Gets file ID for specified path.
*
* @param path Path.
@@ -631,21 +612,36 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Remove explicit lock on file held by the current thread.
+ * Remove explicit lock on file held by the current stream.
*
- * @param info File info to unlock.
+ * @param fileId File ID.
+ * @param lockId Lock ID.
* @param modificationTime Modification time to write to file info.
* @throws IgniteCheckedException If failed.
*/
- public void unlock(final IgfsEntryInfo info, final long modificationTime) throws IgniteCheckedException {
- validTxState(false);
+ public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime)
+ throws IgniteCheckedException {
+ unlock(fileId, lockId, modificationTime, false, 0, null);
+ }
- assert info != null;
+ /**
+ * Remove explicit lock on file held by the current stream.
+ *
+ * @param fileId File ID.
+ * @param lockId Lock ID.
+ * @param modificationTime Modification time to write to file info.
+ * @param updateSpace Whether to update space.
+ * @param space Space.
+ * @param affRange Affinity range.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void unlock(final IgniteUuid fileId, final IgniteUuid lockId, final long modificationTime,
+ final boolean updateSpace, final long space, @Nullable final IgfsFileAffinityRange affRange)
+ throws IgniteCheckedException {
+ validTxState(false);
if (busyLock.enterBusy()) {
try {
- final IgniteUuid lockId = info.lockId();
-
if (lockId == null)
return;
@@ -657,8 +653,6 @@ public class IgfsMetaManager extends IgfsManager {
@Override public Void applyx() throws IgniteCheckedException {
validTxState(true);
- IgniteUuid fileId = info.id();
-
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
@@ -666,12 +660,13 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not " +
"found): " + fileId));
- if (!F.eq(info.lockId(), oldInfo.lockId()))
+ if (!F.eq(lockId, oldInfo.lockId()))
throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) " +
- "[fileId=" + fileId + ", lockId=" + info.lockId() + ", actualLockId=" +
+ "[fileId=" + fileId + ", lockId=" + lockId + ", actualLockId=" +
oldInfo.lockId() + ']');
- id2InfoPrj.invoke(fileId, new IgfsMetaFileUnlockProcessor(modificationTime));
+ id2InfoPrj.invoke(fileId,
+ new IgfsMetaFileUnlockProcessor(modificationTime, updateSpace, space, affRange));
return null;
}
@@ -689,7 +684,7 @@ public class IgfsMetaManager extends IgfsManager {
}
}
else
- throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + info);
+ throw new IllegalStateException("Failed to unlock file system entry because Grid is stopping: " + fileId);
}
/**
@@ -1449,38 +1444,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Check whether there are any pending deletes and return collection of pending delete entry IDs.
- *
- * @return Collection of entry IDs to be deleted.
- * @throws IgniteCheckedException If operation failed.
- */
- public Collection<IgniteUuid> pendingDeletes() throws IgniteCheckedException {
- if (busyLock.enterBusy()) {
- try {
- Collection<IgniteUuid> ids = new HashSet<>();
-
- for (int i = 0; i < IgfsUtils.TRASH_CONCURRENCY; i++) {
- IgniteUuid trashId = IgfsUtils.trashId(i);
-
- IgfsEntryInfo trashInfo = getInfo(trashId);
-
- if (trashInfo != null && trashInfo.hasChildren()) {
- for (IgfsListingEntry entry : trashInfo.listing().values())
- ids.add(entry.fileId());
- }
- }
-
- return ids;
- }
- finally {
- busyLock.leaveBusy();
- }
- }
- else
- throw new IllegalStateException("Failed to get pending deletes because Grid is stopping.");
- }
-
- /**
* Update file info (file properties) in cache in existing transaction.
*
* @param fileId File ID to update information for.
@@ -1545,27 +1508,26 @@ public class IgfsMetaManager extends IgfsManager {
/**
* Reserve space for file.
*
- * @param path File path.
* @param fileId File ID.
* @param space Space.
* @param affRange Affinity range.
* @return New file info.
*/
- public IgfsEntryInfo reserveSpace(IgfsPath path, IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
+ public IgfsEntryInfo reserveSpace(IgniteUuid fileId, long space, IgfsFileAffinityRange affRange)
throws IgniteCheckedException {
validTxState(false);
if (busyLock.enterBusy()) {
try {
if (log.isDebugEnabled())
- log.debug("Reserve file space [path=" + path + ", id=" + fileId + ']');
+ log.debug("Reserve file space: " + fileId);
try (IgniteInternalTx tx = startTx()) {
// Lock file ID for this transaction.
IgfsEntryInfo oldInfo = info(fileId);
if (oldInfo == null)
- throw fsException("File has been deleted concurrently [path=" + path + ", id=" + fileId + ']');
+ throw fsException("File has been deleted concurrently: " + fileId);
IgfsEntryInfo newInfo =
invokeAndGet(fileId, new IgfsMetaFileReserveSpaceProcessor(space, affRange));
@@ -1583,8 +1545,7 @@ public class IgfsMetaManager extends IgfsManager {
}
}
else
- throw new IllegalStateException("Failed to reseve file space because Grid is stopping [path=" + path +
- ", id=" + fileId + ']');
+ throw new IllegalStateException("Failed to reserve file space because Grid is stopping:" + fileId);
}
/**
@@ -1969,7 +1930,7 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Create the file in DUAL mode.
+ * A delegate method that performs file creation in the synchronization task.
*
* @param fs File system.
* @param path Path.
@@ -2002,29 +1963,8 @@ public class IgfsMetaManager extends IgfsManager {
// Events to fire (can be done outside of a transaction).
final Deque<IgfsEvent> pendingEvts = new LinkedList<>();
- SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> task =
- new SynchronizationTask<IgfsSecondaryOutputStreamDescriptor>() {
- /** Container for the secondary file system output stream. */
- private final T1<OutputStream> outT1 = new T1<>(null);
-
- @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath,
- IgfsEntryInfo> infos) throws Exception {
- return onSuccessCreate(fs, path, simpleCreate, props,
- overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1);
- }
-
- @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err)
- throws IgniteCheckedException {
- U.closeQuiet(outT1.get());
-
- U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
- simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
- bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
-
- throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
- "exception: " + path, err);
- }
- };
+ CreateFileSynchronizationTask task = new CreateFileSynchronizationTask(fs, path, simpleCreate, props,
+ overwrite, bufSize, replication, blockSize, affKey, pendingEvts);
try {
return synchronizeAndExecute(task, fs, false, path.parent());
@@ -3004,29 +2944,6 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
- * Synchronization task interface.
- */
- private static interface SynchronizationTask<T> {
- /**
- * Callback handler in case synchronization was successful.
- *
- * @param infos Map from paths to corresponding infos.
- * @return Task result.
- * @throws Exception If failed.
- */
- public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
-
- /**
- * Callback handler in case synchronization failed.
- *
- * @param err Optional exception.
- * @return Task result.
- * @throws IgniteCheckedException In case exception is to be thrown in that case.
- */
- public T onFailure(Exception err) throws IgniteCheckedException;
- }
-
- /**
* Append routine.
*
* @param path Path.
@@ -3400,4 +3317,113 @@ public class IgfsMetaManager extends IgfsManager {
if (delWorker0 != null)
delWorker0.signal();
}
+
+ /**
+ * Synchronization task interface.
+ */
+ private static interface SynchronizationTask<T> {
+ /**
+ * Callback handler in case synchronization was successful.
+ *
+ * @param infos Map from paths to corresponding infos.
+ * @return Task result.
+ * @throws Exception If failed.
+ */
+ public T onSuccess(Map<IgfsPath, IgfsEntryInfo> infos) throws Exception;
+
+ /**
+ * Callback handler in case synchronization failed.
+ *
+ * @param err Optional exception.
+ * @return Task result.
+ * @throws IgniteCheckedException In case exception is to be thrown in that case.
+ */
+ public T onFailure(Exception err) throws IgniteCheckedException;
+ }
+
+ /**
+ * Synchronization task to create a file.
+ */
+ private class CreateFileSynchronizationTask implements SynchronizationTask<IgfsSecondaryOutputStreamDescriptor> {
+ /** Secondary file system. */
+ private IgfsSecondaryFileSystem fs;
+
+ /** Path. */
+ private IgfsPath path;
+
+ /** Simple create flag. */
+ private boolean simpleCreate;
+
+ /** Properties. */
+ private Map<String, String> props;
+
+ /** Overwrite flag. */
+ private boolean overwrite;
+
+ /** Buffer size. */
+ private int bufSize;
+
+ /** Replication factor. */
+ private short replication;
+
+ /** Block size. */
+ private long blockSize;
+
+ /** Affinity key. */
+ private IgniteUuid affKey;
+
+ /** Pending events. */
+ private Deque<IgfsEvent> pendingEvts;
+
+ /** Output stream to the secondary file system. */
+ private final T1<OutputStream> outT1 = new T1<>(null);
+
+ /**
+ * Constructor.
+ *
+ * @param fs Secondary file system.
+ * @param path Path.
+ * @param simpleCreate Simple create flag.
+ * @param props Properties.
+ * @param overwrite Overwrite flag.
+ * @param bufSize Buffer size.
+ * @param replication Replication factor.
+ * @param blockSize Block size.
+ * @param affKey Affinity key.
+ * @param pendingEvts Pending events.
+ */
+ public CreateFileSynchronizationTask(IgfsSecondaryFileSystem fs, IgfsPath path, boolean simpleCreate,
+ @Nullable Map<String, String> props, boolean overwrite, int bufSize, short replication, long blockSize,
+ IgniteUuid affKey, Deque<IgfsEvent> pendingEvts) {
+ this.fs = fs;
+ this.path = path;
+ this.simpleCreate = simpleCreate;
+ this.props = props;
+ this.overwrite = overwrite;
+ this.bufSize = bufSize;
+ this.replication = replication;
+ this.blockSize = blockSize;
+ this.affKey = affKey;
+ this.pendingEvts = pendingEvts;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onSuccess(Map<IgfsPath, IgfsEntryInfo> infos)
+ throws Exception {
+ return onSuccessCreate(fs, path, simpleCreate, props,
+ overwrite, bufSize, replication, blockSize, affKey, infos, pendingEvts, outT1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryOutputStreamDescriptor onFailure(Exception err) throws IgniteCheckedException {
+ U.closeQuiet(outT1.get());
+
+ U.error(log, "File create in DUAL mode failed [path=" + path + ", simpleCreate=" +
+ simpleCreate + ", props=" + props + ", overwrite=" + overwrite + ", bufferSize=" +
+ bufSize + ", replication=" + replication + ", blockSize=" + blockSize + ']', err);
+
+ throw new IgniteCheckedException("Failed to create the file due to secondary file system " +
+ "exception: " + path, err);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
deleted file mode 100644
index 43de61e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamAdapter.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs;
-
-import java.io.DataInput;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.igfs.IgfsOutputStream;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Output stream to store data into grid cache with separate blocks.
- */
-@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-abstract class IgfsOutputStreamAdapter extends IgfsOutputStream {
- /** Path to file. */
- protected final IgfsPath path;
-
- /** Buffer size. */
- private final int bufSize;
-
- /** Flag for this stream open/closed state. */
- private boolean closed;
-
- /** Local buffer to store stream data as consistent block. */
- private ByteBuffer buf;
-
- /** Bytes written. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
- protected long bytes;
-
- /** Time consumed by write operations. */
- protected long time;
-
- /**
- * Constructs file output stream.
- *
- * @param path Path to stored file.
- * @param bufSize The size of the buffer to be used.
- */
- IgfsOutputStreamAdapter(IgfsPath path, int bufSize) {
- assert path != null;
- assert bufSize > 0;
-
- this.path = path;
- this.bufSize = bufSize;
- }
-
- /**
- * Gets number of written bytes.
- *
- * @return Written bytes.
- */
- public long bytes() {
- return bytes;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(int b) throws IOException {
- checkClosed(null, 0);
-
- long startTime = System.nanoTime();
-
- b &= 0xFF;
-
- if (buf == null)
- buf = ByteBuffer.allocate(bufSize);
-
- buf.put((byte)b);
-
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
-
- time += System.nanoTime() - startTime;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void write(byte[] b, int off, int len) throws IOException {
- A.notNull(b, "b");
-
- if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
- throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
- ", length=" + len + ']');
- }
-
- checkClosed(null, 0);
-
- if (len == 0)
- return; // Done.
-
- long startTime = System.nanoTime();
-
- if (buf == null) {
- // Do not allocate and copy byte buffer if will send data immediately.
- if (len >= bufSize) {
- buf = ByteBuffer.wrap(b, off, len);
-
- sendData(false);
-
- return;
- }
-
- buf = ByteBuffer.allocate(Math.max(bufSize, len));
- }
-
- if (buf.remaining() < len)
- // Expand buffer capacity, if remaining size is less then data size.
- buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
-
- assert len <= buf.remaining() : "Expects write data size less or equal then remaining buffer capacity " +
- "[len=" + len + ", buf.remaining=" + buf.remaining() + ']';
-
- buf.put(b, off, len);
-
- if (buf.position() >= bufSize)
- sendData(true); // Send data to server.
-
- time += System.nanoTime() - startTime;
- }
-
- /** {@inheritDoc} */
- @Override public synchronized void transferFrom(DataInput in, int len) throws IOException {
- checkClosed(in, len);
-
- long startTime = System.nanoTime();
-
- // Send all IPC data from the local buffer before streaming.
- if (buf != null && buf.position() > 0)
- sendData(true);
-
- try {
- storeDataBlocks(in, len);
- }
- catch (IgniteCheckedException e) {
- throw new IOException(e.getMessage(), e);
- }
-
- time += System.nanoTime() - startTime;
- }
-
- /**
- * Flushes this output stream and forces any buffered output bytes to be written out.
- *
- * @exception IOException if an I/O error occurs.
- */
- @Override public synchronized void flush() throws IOException {
- checkClosed(null, 0);
-
- // Send all IPC data from the local buffer.
- if (buf != null && buf.position() > 0)
- sendData(true);
- }
-
- /** {@inheritDoc} */
- @Override public final synchronized void close() throws IOException {
- // Do nothing if stream is already closed.
- if (closed)
- return;
-
- try {
- // Send all IPC data from the local buffer.
- try {
- flush();
- }
- finally {
- onClose(); // "onClose()" routine must be invoked anyway!
- }
- }
- finally {
- // Mark this stream closed AFTER flush.
- closed = true;
- }
- }
-
- /**
- * Store data blocks in file.<br/>
- * Note! If file concurrently deleted we'll get lost blocks.
- *
- * @param data Data to store.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract void storeDataBlock(ByteBuffer data) throws IgniteCheckedException, IOException;
-
- /**
- * Store data blocks in file reading appropriate number of bytes from given data input.
- *
- * @param in Data input to read from.
- * @param len Data length to store.
- * @throws IgniteCheckedException If failed.
- */
- protected abstract void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException;
-
- /**
- * Close callback. It will be called only once in synchronized section.
- *
- * @throws IOException If failed.
- */
- protected void onClose() throws IOException {
- // No-op.
- }
-
- /**
- * Validate this stream is open.
- *
- * @throws IOException If this stream is closed.
- */
- private void checkClosed(@Nullable DataInput in, int len) throws IOException {
- assert Thread.holdsLock(this);
-
- if (closed) {
- // Must read data from stream before throwing exception.
- if (in != null)
- in.skipBytes(len);
-
- throw new IOException("Stream has been closed: " + this);
- }
- }
-
- /**
- * Send all local-buffered data to server.
- *
- * @param flip Whether to flip buffer on sending data. We do not want to flip it if sending wrapped
- * byte array.
- * @throws IOException In case of IO exception.
- */
- private void sendData(boolean flip) throws IOException {
- assert Thread.holdsLock(this);
-
- try {
- if (flip)
- buf.flip();
-
- storeDataBlock(buf);
- }
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to store data into file: " + path, e);
- }
-
- buf = null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(IgfsOutputStreamAdapter.class, this);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
index 21e5fb6..7741a25 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsOutputStreamImpl.java
@@ -18,12 +18,13 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.events.IgfsEvent;
import org.apache.ignite.igfs.IgfsException;
import org.apache.ignite.igfs.IgfsMode;
+import org.apache.ignite.igfs.IgfsOutputStream;
import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.igfs.IgfsPathNotFoundException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
@@ -32,8 +33,8 @@ import org.jetbrains.annotations.Nullable;
import java.io.DataInput;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
+import static org.apache.ignite.events.EventType.EVT_IGFS_FILE_CLOSED_WRITE;
import static org.apache.ignite.igfs.IgfsMode.DUAL_SYNC;
import static org.apache.ignite.igfs.IgfsMode.PRIMARY;
import static org.apache.ignite.igfs.IgfsMode.PROXY;
@@ -41,25 +42,44 @@ import static org.apache.ignite.igfs.IgfsMode.PROXY;
/**
* Output stream to store data into grid cache with separate blocks.
*/
-class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
+class IgfsOutputStreamImpl extends IgfsOutputStream {
/** Maximum number of blocks in buffer. */
private static final int MAX_BLOCKS_CNT = 16;
/** IGFS context. */
- private IgfsContext igfsCtx;
+ private final IgfsContext igfsCtx;
- /** Meta info manager. */
- private final IgfsMetaManager meta;
+ /** Path to file. */
+ private final IgfsPath path;
- /** Data manager. */
- private final IgfsDataManager data;
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** IGFS mode. */
+ private final IgfsMode mode;
+
+ /** File worker batch. */
+ private final IgfsFileWorkerBatch batch;
+
+ /** Mutex for synchronization. */
+ private final Object mux = new Object();
+
+ /** Flag for this stream open/closed state. */
+ private boolean closed;
+
+ /** Local buffer to store stream data as consistent block. */
+ private ByteBuffer buf;
+
+ /** Bytes written. */
+ private long bytes;
+
+ /** Time consumed by write operations. */
+ private long time;
/** File descriptor. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private IgfsEntryInfo fileInfo;
/** Space in file to write data. */
- @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private long space;
/** Intermediate remainder to keep data. */
@@ -68,21 +88,6 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
/** Data length in remainder. */
private int remainderDataLen;
- /** Write completion future. */
- private final IgniteInternalFuture<Boolean> writeCompletionFut;
-
- /** IGFS mode. */
- private final IgfsMode mode;
-
- /** File worker batch. */
- private final IgfsFileWorkerBatch batch;
-
- /** Ensures that onClose)_ routine is called no more than once. */
- private final AtomicBoolean onCloseGuard = new AtomicBoolean();
-
- /** Local IGFS metrics. */
- private final IgfsLocalMetrics metrics;
-
/** Affinity written by this output stream. */
private IgfsFileAffinityRange streamRange;
@@ -95,259 +100,234 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
* @param bufSize The size of the buffer to be used.
* @param mode Grid IGFS mode.
* @param batch Optional secondary file system batch.
- * @param metrics Local IGFS metrics.
*/
IgfsOutputStreamImpl(IgfsContext igfsCtx, IgfsPath path, IgfsEntryInfo fileInfo, int bufSize, IgfsMode mode,
- @Nullable IgfsFileWorkerBatch batch, IgfsLocalMetrics metrics) {
- super(path, optimizeBufferSize(bufSize, fileInfo));
-
- assert fileInfo != null;
- assert fileInfo.isFile() : "Unexpected file info: " + fileInfo;
- assert mode != null && mode != PROXY;
- assert mode == PRIMARY && batch == null || batch != null;
- assert metrics != null;
+ @Nullable IgfsFileWorkerBatch batch) {
+ assert fileInfo != null && fileInfo.isFile() : "Unexpected file info: " + fileInfo;
+ assert mode != null && mode != PROXY && (mode == PRIMARY && batch == null || batch != null);
// File hasn't been locked.
if (fileInfo.lockId() == null)
throw new IgfsException("Failed to acquire file lock (concurrently modified?): " + path);
- assert !IgfsUtils.DELETE_LOCK_ID.equals(fileInfo.lockId());
+ synchronized (mux) {
+ this.path = path;
+ this.bufSize = optimizeBufferSize(bufSize, fileInfo);
+ this.igfsCtx = igfsCtx;
+ this.fileInfo = fileInfo;
+ this.mode = mode;
+ this.batch = batch;
- this.igfsCtx = igfsCtx;
- meta = igfsCtx.meta();
- data = igfsCtx.data();
+ streamRange = initialStreamRange(fileInfo);
- this.fileInfo = fileInfo;
- this.mode = mode;
- this.batch = batch;
- this.metrics = metrics;
-
- streamRange = initialStreamRange(fileInfo);
+ igfsCtx.data().writeStart(fileInfo.id());
+ }
- writeCompletionFut = data.writeStart(fileInfo);
+ igfsCtx.igfs().localMetrics().incrementFilesOpenedForWrite();
}
- /**
- * Optimize buffer size.
- *
- * @param bufSize Requested buffer size.
- * @param fileInfo File info.
- * @return Optimized buffer size.
- */
- @SuppressWarnings("IfMayBeConditional")
- private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
- assert bufSize > 0;
-
- if (fileInfo == null)
- return bufSize;
-
- int blockSize = fileInfo.blockSize();
+ /** {@inheritDoc} */
+ @Override public void write(int b) throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
- if (blockSize <= 0)
- return bufSize;
+ b &= 0xFF;
- if (bufSize <= blockSize)
- // Optimize minimum buffer size to be equal file's block size.
- return blockSize;
+ long startTime = System.nanoTime();
- int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+ if (buf == null)
+ buf = allocateNewBuffer();
- if (bufSize > maxBufSize)
- // There is no profit or optimization from larger buffers.
- return maxBufSize;
+ buf.put((byte)b);
- if (fileInfo.length() == 0)
- // Make buffer size multiple of block size (optimized for new files).
- return bufSize / blockSize * blockSize;
+ sendBufferIfFull();
- return bufSize;
+ time += System.nanoTime() - startTime;
+ }
}
/** {@inheritDoc} */
- @Override protected synchronized void storeDataBlock(ByteBuffer block) throws IgniteCheckedException, IOException {
- int writeLen = block.remaining();
+ @SuppressWarnings("NullableProblems")
+ @Override public void write(byte[] b, int off, int len) throws IOException {
+ A.notNull(b, "b");
- preStoreDataBlocks(null, writeLen);
+ if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
+ throw new IndexOutOfBoundsException("Invalid bounds [data.length=" + b.length + ", offset=" + off +
+ ", length=" + len + ']');
+ }
- int blockSize = fileInfo.blockSize();
+ synchronized (mux) {
+ checkClosed(null, 0);
+
+ // Check if there is anything to write.
+ if (len == 0)
+ return;
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + writeLen < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
+ long startTime = System.nanoTime();
- byte[] allocated = new byte[blockSize];
+ if (buf == null) {
+ if (len >= bufSize) {
+ // Send data right away.
+ ByteBuffer tmpBuf = ByteBuffer.wrap(b, off, len);
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+ send(tmpBuf, tmpBuf.remaining());
+ }
+ else {
+ buf = allocateNewBuffer();
- remainder = allocated;
+ buf.put(b, off, len);
+ }
}
+ else {
+ // Re-allocate buffer if needed.
+ if (buf.remaining() < len)
+ buf = ByteBuffer.allocate(buf.position() + len).put((ByteBuffer)buf.flip());
- block.get(remainder, remainderDataLen, writeLen);
+ buf.put(b, off, len);
- remainderDataLen += writeLen;
- }
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, block,
- false, streamRange, batch);
+ sendBufferIfFull();
+ }
- remainderDataLen = remainder == null ? 0 : remainder.length;
+ time += System.nanoTime() - startTime;
}
}
/** {@inheritDoc} */
- @Override protected synchronized void storeDataBlocks(DataInput in, int len) throws IgniteCheckedException, IOException {
- preStoreDataBlocks(in, len);
-
- int blockSize = fileInfo.blockSize();
-
- // If data length is not enough to fill full block, fill the remainder and return.
- if (remainderDataLen + len < blockSize) {
- if (remainder == null)
- remainder = new byte[blockSize];
- else if (remainder.length != blockSize) {
- assert remainderDataLen == remainder.length;
+ @Override public void transferFrom(DataInput in, int len) throws IOException {
+ synchronized (mux) {
+ checkClosed(in, len);
- byte[] allocated = new byte[blockSize];
+ long startTime = System.nanoTime();
- U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+ // Clean-up local buffer before streaming.
+ sendBufferIfNotEmpty();
- remainder = allocated;
- }
-
- in.readFully(remainder, remainderDataLen, len);
-
- remainderDataLen += len;
- }
- else {
- remainder = data.storeDataBlocks(fileInfo, fileInfo.length() + space, remainder, remainderDataLen, in, len,
- false, streamRange, batch);
+ // Perform transfer.
+ send(in, len);
- remainderDataLen = remainder == null ? 0 : remainder.length;
+ time += System.nanoTime() - startTime;
}
}
/**
- * Initializes data loader if it was not initialized yet and updates written space.
+ * Flushes this output stream and forces any buffered output bytes to be written out.
*
- * @param len Data length to be written.
+ * @exception IOException if an I/O error occurs.
*/
- private void preStoreDataBlocks(@Nullable DataInput in, int len) throws IgniteCheckedException, IOException {
- // Check if any exception happened while writing data.
- if (writeCompletionFut.isDone()) {
- assert ((GridFutureAdapter)writeCompletionFut).isFailed();
+ @Override public void flush() throws IOException {
+ synchronized (mux) {
+ checkClosed(null, 0);
- if (in != null)
- in.skipBytes(len);
+ sendBufferIfNotEmpty();
- writeCompletionFut.get();
- }
+ flushRemainder();
+
+ awaitAcks();
+
+ // Update file length if needed.
+ if (igfsCtx.configuration().isUpdateFileLengthOnFlush() && space > 0) {
+ try {
+ IgfsEntryInfo fileInfo0 = igfsCtx.meta().reserveSpace(fileInfo.id(), space, streamRange);
+
+ if (fileInfo0 == null)
+ throw new IOException("File was concurrently deleted: " + path);
+ else
+ fileInfo = fileInfo0;
+
+ streamRange = initialStreamRange(fileInfo);
- bytes += len;
- space += len;
+ space = 0;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to update file length data [path=" + path +
+ ", space=" + space + ']', e);
+ }
+ }
+ }
}
/**
- * Flushes this output stream and forces any buffered output bytes to be written out.
+ * Await acknowledgments.
*
- * @exception IOException if an I/O error occurs.
+ * @throws IOException If failed.
*/
- @Override public synchronized void flush() throws IOException {
- boolean exists;
-
+ private void awaitAcks() throws IOException {
try {
- exists = meta.exists(fileInfo.id());
+ igfsCtx.data().awaitAllAcksReceived(fileInfo.id());
}
catch (IgniteCheckedException e) {
- throw new IOException("File to read file metadata: " + path, e);
+ throw new IOException("Failed to wait for flush acknowledge: " + fileInfo.id, e);
}
+ }
- if (!exists) {
- onClose(true);
-
- throw new IOException("File was concurrently deleted: " + path);
- }
-
- super.flush();
-
+ /**
+ * Flush remainder.
+ *
+ * @throws IOException If failed.
+ */
+ private void flushRemainder() throws IOException {
try {
if (remainder != null) {
- data.storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
+ igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, null, 0,
ByteBuffer.wrap(remainder, 0, remainderDataLen), true, streamRange, batch);
remainder = null;
remainderDataLen = 0;
}
-
- if (space > 0) {
- data.awaitAllAcksReceived(fileInfo.id());
-
- IgfsEntryInfo fileInfo0 = meta.reserveSpace(path, fileInfo.id(), space, streamRange);
-
- if (fileInfo0 == null)
- throw new IOException("File was concurrently deleted: " + path);
- else
- fileInfo = fileInfo0;
-
- streamRange = initialStreamRange(fileInfo);
-
- space = 0;
- }
}
catch (IgniteCheckedException e) {
- throw new IOException("Failed to flush data [path=" + path + ", space=" + space + ']', e);
+ throw new IOException("Failed to flush data (remainder) [path=" + path + ", space=" + space + ']', e);
}
}
/** {@inheritDoc} */
- @Override protected void onClose() throws IOException {
- onClose(false);
- }
+ @Override public final void close() throws IOException {
+ synchronized (mux) {
+ // Do nothing if stream is already closed.
+ if (closed)
+ return;
- /**
- * Close callback. It will be called only once in synchronized section.
- *
- * @param deleted Whether we already know that the file was deleted.
- * @throws IOException If failed.
- */
- private void onClose(boolean deleted) throws IOException {
- assert Thread.holdsLock(this);
-
- if (onCloseGuard.compareAndSet(false, true)) {
- // Notify backing secondary file system batch to finish.
- if (mode != PRIMARY) {
- assert batch != null;
+ // Set closed flag immediately.
+ closed = true;
- batch.finish();
- }
+ // Flush data.
+ IOException err = null;
- // Ensure file existence.
- boolean exists;
+ boolean flushSuccess = false;
try {
- exists = !deleted && meta.exists(fileInfo.id());
- }
- catch (IgniteCheckedException e) {
- throw new IOException("File to read file metadata: " + path, e);
- }
+ sendBufferIfNotEmpty();
- if (exists) {
- IOException err = null;
+ flushRemainder();
- try {
- data.writeClose(fileInfo);
+ igfsCtx.data().writeClose(fileInfo.id(), true);
- writeCompletionFut.get();
- }
- catch (IgniteCheckedException e) {
- err = new IOException("Failed to close stream [path=" + path + ", fileInfo=" + fileInfo + ']', e);
- }
+ flushSuccess = true;
+ }
+ catch (Exception e) {
+ err = new IOException("Failed to flush data during stream close [path=" + path +
+ ", fileInfo=" + fileInfo + ']', e);
+ }
- metrics.addWrittenBytesTime(bytes, time);
+ // Unlock the file after data is flushed.
+ try {
+ if (flushSuccess && space > 0)
+ igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis(), true,
+ space, streamRange);
+ else
+ igfsCtx.meta().unlock(fileInfo.id(), fileInfo.lockId(), System.currentTimeMillis());
+ }
+ catch (Exception e) {
+ if (err == null)
+ err = new IOException("File to release file lock: " + path, e);
+ else
+ err.addSuppressed(e);
+ }
+
+ // Finally, await secondary file system flush.
+ if (batch != null) {
+ batch.finish();
- // Await secondary file system processing to finish.
if (mode == DUAL_SYNC) {
try {
batch.await();
@@ -356,40 +336,141 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
if (err == null)
err = new IOException("Failed to close secondary file system stream [path=" + path +
", fileInfo=" + fileInfo + ']', e);
+ else
+ err.addSuppressed(e);
}
}
+ }
- long modificationTime = System.currentTimeMillis();
+ // Throw error, if any.
+ if (err != null)
+ throw err;
- try {
- meta.unlock(fileInfo, modificationTime);
- }
- catch (IgfsPathNotFoundException ignore) {
- data.delete(fileInfo); // Safety to ensure that all data blocks are deleted.
+ igfsCtx.igfs().localMetrics().addWrittenBytesTime(bytes, time);
+ igfsCtx.igfs().localMetrics().decrementFilesOpenedForWrite();
- throw new IOException("File was concurrently deleted: " + path);
- }
- catch (IgniteCheckedException e) {
- throw new IOException("File to read file metadata: " + path, e);
+ GridEventStorageManager evts = igfsCtx.kernalContext().event();
+
+ if (evts.isRecordable(EVT_IGFS_FILE_CLOSED_WRITE))
+ evts.record(new IgfsEvent(path, igfsCtx.kernalContext().discovery().localNode(),
+ EVT_IGFS_FILE_CLOSED_WRITE, bytes));
+ }
+ }
+
+ /**
+ * Validate this stream is open.
+ *
+ * @throws IOException If this stream is closed.
+ */
+ private void checkClosed(@Nullable DataInput in, int len) throws IOException {
+ assert Thread.holdsLock(mux);
+
+ if (closed) {
+ // Must read data from stream before throwing exception.
+ if (in != null)
+ in.skipBytes(len);
+
+ throw new IOException("Stream has been closed: " + this);
+ }
+ }
+
+ /**
+ * Send local buffer if it full.
+ *
+ * @throws IOException If failed.
+ */
+ private void sendBufferIfFull() throws IOException {
+ if (buf.position() >= bufSize)
+ sendBuffer();
+ }
+
+ /**
+ * Send local buffer if at least something is stored there.
+ *
+ * @throws IOException
+ */
+ private void sendBufferIfNotEmpty() throws IOException {
+ if (buf != null && buf.position() > 0)
+ sendBuffer();
+ }
+
+ /**
+ * Send all local-buffered data to server.
+ *
+ * @throws IOException In case of IO exception.
+ */
+ private void sendBuffer() throws IOException {
+ buf.flip();
+
+ send(buf, buf.remaining());
+
+ buf = null;
+ }
+
+ /**
+ * Store data block.
+ *
+ * @param data Block.
+ * @param writeLen Write length.
+ * @throws IOException If failed.
+ */
+ private void send(Object data, int writeLen) throws IOException {
+ assert Thread.holdsLock(mux);
+ assert data instanceof ByteBuffer || data instanceof DataInput;
+
+ try {
+ // Increment metrics.
+ bytes += writeLen;
+ space += writeLen;
+
+ int blockSize = fileInfo.blockSize();
+
+ // If data length is not enough to fill full block, fill the remainder and return.
+ if (remainderDataLen + writeLen < blockSize) {
+ if (remainder == null)
+ remainder = new byte[blockSize];
+ else if (remainder.length != blockSize) {
+ assert remainderDataLen == remainder.length;
+
+ byte[] allocated = new byte[blockSize];
+
+ U.arrayCopy(remainder, 0, allocated, 0, remainder.length);
+
+ remainder = allocated;
}
- if (err != null)
- throw err;
+ if (data instanceof ByteBuffer)
+ ((ByteBuffer) data).get(remainder, remainderDataLen, writeLen);
+ else
+ ((DataInput) data).readFully(remainder, remainderDataLen, writeLen);
+
+ remainderDataLen += writeLen;
}
else {
- try {
- if (mode == DUAL_SYNC)
- batch.await();
+ if (data instanceof ByteBuffer) {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (ByteBuffer) data, false, streamRange, batch);
}
- catch (IgniteCheckedException e) {
- throw new IOException("Failed to close secondary file system stream [path=" + path +
- ", fileInfo=" + fileInfo + ']', e);
- }
- finally {
- data.delete(fileInfo);
+ else {
+ remainder = igfsCtx.data().storeDataBlocks(fileInfo, fileInfo.length() + space, remainder,
+ remainderDataLen, (DataInput) data, writeLen, false, streamRange, batch);
}
+
+ remainderDataLen = remainder == null ? 0 : remainder.length;
}
}
+ catch (IgniteCheckedException e) {
+ throw new IOException("Failed to store data into file: " + path, e);
+ }
+ }
+
+ /**
+ * Allocate new buffer.
+ *
+ * @return New buffer.
+ */
+ private ByteBuffer allocateNewBuffer() {
+ return ByteBuffer.allocate(bufSize);
}
/**
@@ -421,11 +502,46 @@ class IgfsOutputStreamImpl extends IgfsOutputStreamAdapter {
IgniteUuid prevAffKey = map == null ? null : map.affinityKey(lastBlockOff, false);
- IgniteUuid affKey = data.nextAffinityKey(prevAffKey);
+ IgniteUuid affKey = igfsCtx.data().nextAffinityKey(prevAffKey);
return affKey == null ? null : new IgfsFileAffinityRange(off, off, affKey);
}
+ /**
+ * Optimize buffer size.
+ *
+ * @param bufSize Requested buffer size.
+ * @param fileInfo File info.
+ * @return Optimized buffer size.
+ */
+ private static int optimizeBufferSize(int bufSize, IgfsEntryInfo fileInfo) {
+ assert bufSize > 0;
+
+ if (fileInfo == null)
+ return bufSize;
+
+ int blockSize = fileInfo.blockSize();
+
+ if (blockSize <= 0)
+ return bufSize;
+
+ if (bufSize <= blockSize)
+ // Optimize minimum buffer size to be equal file's block size.
+ return blockSize;
+
+ int maxBufSize = blockSize * MAX_BLOCKS_CNT;
+
+ if (bufSize > maxBufSize)
+ // There is no profit or optimization from larger buffers.
+ return maxBufSize;
+
+ if (fileInfo.length() == 0)
+ // Make buffer size multiple of block size (optimized for new files).
+ return bufSize / blockSize * blockSize;
+
+ return bufSize;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgfsOutputStreamImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
index 6827e4a..8d23490 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/meta/IgfsMetaFileUnlockProcessor.java
@@ -24,8 +24,11 @@ import org.apache.ignite.binary.BinaryReader;
import org.apache.ignite.binary.BinaryWriter;
import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.internal.processors.igfs.IgfsEntryInfo;
+import org.apache.ignite.internal.processors.igfs.IgfsFileAffinityRange;
+import org.apache.ignite.internal.processors.igfs.IgfsFileMap;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
@@ -46,6 +49,15 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
/** Modification time. */
private long modificationTime;
+ /** Whether to update space. */
+ private boolean updateSpace;
+
+ /** Space. */
+ private long space;
+
+ /** Affinity range. */
+ private IgfsFileAffinityRange affRange;
+
/**
* Default constructor.
*/
@@ -57,17 +69,36 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
* Constructor.
*
* @param modificationTime Modification time.
+ * @param updateSpace Whether to update space.
+ * @param space Space.
+ * @param affRange Affinity range.
*/
- public IgfsMetaFileUnlockProcessor(long modificationTime) {
+ public IgfsMetaFileUnlockProcessor(long modificationTime, boolean updateSpace, long space,
+ @Nullable IgfsFileAffinityRange affRange) {
this.modificationTime = modificationTime;
+ this.updateSpace = updateSpace;
+ this.space = space;
+ this.affRange = affRange;
}
/** {@inheritDoc} */
@Override public Void process(MutableEntry<IgniteUuid, IgfsEntryInfo> entry, Object... args)
throws EntryProcessorException {
- IgfsEntryInfo old = entry.getValue();
+ IgfsEntryInfo oldInfo = entry.getValue();
+
+ assert oldInfo != null;
+
+ IgfsEntryInfo newInfo = oldInfo.unlock(modificationTime);
+
+ if (updateSpace) {
+ IgfsFileMap newMap = new IgfsFileMap(newInfo.fileMap());
- entry.setValue(old.unlock(modificationTime));
+ newMap.addRange(affRange);
+
+ newInfo = newInfo.length(newInfo.length() + space).fileMap(newMap);
+ }
+
+ entry.setValue(newInfo);
return null;
}
@@ -75,11 +106,27 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeLong(modificationTime);
+
+ if (updateSpace) {
+ out.writeBoolean(true);
+ out.writeLong(space);
+ out.writeObject(affRange);
+ }
+ else
+ out.writeBoolean(false);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
modificationTime = in.readLong();
+
+ if (in.readBoolean()) {
+ updateSpace = true;
+ space = in.readLong();
+ affRange = (IgfsFileAffinityRange)in.readObject();
+ }
+ else
+ updateSpace = false;
}
/** {@inheritDoc} */
@@ -87,6 +134,14 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
BinaryRawWriter out = writer.rawWriter();
out.writeLong(modificationTime);
+
+ if (updateSpace) {
+ out.writeBoolean(true);
+ out.writeLong(space);
+ out.writeObject(affRange);
+ }
+ else
+ out.writeBoolean(false);
}
/** {@inheritDoc} */
@@ -94,6 +149,14 @@ public class IgfsMetaFileUnlockProcessor implements EntryProcessor<IgniteUuid, I
BinaryRawReader in = reader.rawReader();
modificationTime = in.readLong();
+
+ if (in.readBoolean()) {
+ updateSpace = true;
+ space = in.readLong();
+ affRange = in.readObject();
+ }
+ else
+ updateSpace = false;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 8a39de0..a0bef0f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -2996,10 +2996,12 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
* @param chunks Expected data.
* @throws Exception If failed.
*/
- protected void checkFile(IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath file,
+ protected void checkFile(@Nullable IgfsImpl igfs, UniversalFileSystemAdapter igfsSecondary, IgfsPath file,
@Nullable byte[]... chunks) throws Exception {
- checkExist(igfs, file);
- checkFileContent(igfs, file, chunks);
+ if (igfs != null) {
+ checkExist(igfs, file);
+ checkFileContent(igfs, file, chunks);
+ }
if (dual) {
checkExist(igfsSecondary, file);
@@ -3025,16 +3027,18 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
is = igfs.open(file);
int chunkIdx = 0;
+ int pos = 0;
for (byte[] chunk : chunks) {
byte[] buf = new byte[chunk.length];
- is.readFully(0, buf);
+ is.readFully(pos, buf);
assert Arrays.equals(chunk, buf) : "Bad chunk [igfs=" + igfs.name() + ", chunkIdx=" + chunkIdx +
", expected=" + Arrays.toString(chunk) + ", actual=" + Arrays.toString(buf) + ']';
chunkIdx++;
+ pos += chunk.length;
}
is.close();
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
index 013bb18..0d1a66f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDataManagerSelfTest.java
@@ -178,7 +178,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
rnd.nextBytes(data);
- IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+ IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
expectsStoreFail(info, data, "Not enough space reserved to store data");
@@ -195,7 +195,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
assert remainder == null;
- mgr.writeClose(info);
+ mgr.writeClose(info.id(), false);
fut.get(3000);
@@ -269,7 +269,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
info = info.length(info.length() + data.length + remainder.length);
- IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+ IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
IgfsFileAffinityRange range = new IgfsFileAffinityRange();
@@ -287,7 +287,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
assert left2 == null;
- mgr.writeClose(info);
+ mgr.writeClose(info.id(), false);
fut.get(3000);
@@ -358,7 +358,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
info = info.length(info.length() + data.length * writesCnt);
- IgniteInternalFuture<Boolean> fut = mgr.writeStart(info);
+ IgniteInternalFuture<Boolean> fut = mgr.writeStart(info.id());
for (int j = 0; j < 64; j++) {
Arrays.fill(data, (byte)(j / 4));
@@ -369,7 +369,7 @@ public class IgfsDataManagerSelfTest extends IgfsCommonAbstractTest {
assert left == null : "No remainder should be returned if flush is true: " + Arrays.toString(left);
}
- mgr.writeClose(info);
+ mgr.writeClose(info.id(), false);
assertTrue(range.regionEqual(new IgfsFileAffinityRange(0, writesCnt * chunkSize - 1, null)));
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
index 0ce1036..29bb2cd 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsProcessorValidationSelfTest.java
@@ -445,27 +445,41 @@ public class IgfsProcessorValidationSelfTest extends IgfsCommonAbstractTest {
/**
* @throws Exception If failed.
*/
- public void testInvalidEndpointTcpPort() throws Exception {
+ public void testZeroEndpointTcpPort() throws Exception {
+ checkIvalidPort(0);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testNegativeEndpointTcpPort() throws Exception {
+ checkIvalidPort(-1);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testTooBigEndpointTcpPort() throws Exception {
+ checkIvalidPort(65536);
+ }
+
+ /**
+ * Check invalid port handling.
+ *
+ * @param port Port.
+ * @throws Exception If failed.
+ */
+ private void checkIvalidPort(int port) throws Exception {
final String failMsg = "IGFS endpoint TCP port is out of range";
g1Cfg.setCacheConfiguration(concat(dataCaches(1024), metaCaches(), CacheConfiguration.class));
final String igfsCfgName = "igfs-cfg";
final IgfsIpcEndpointConfiguration igfsEndpointCfg = new IgfsIpcEndpointConfiguration();
- igfsEndpointCfg.setPort(0);
+ igfsEndpointCfg.setPort(port);
g1IgfsCfg1.setName(igfsCfgName);
g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
checkGridStartFails(g1Cfg, failMsg, true);
-
- igfsEndpointCfg.setPort(-1);
- g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
-
- checkGridStartFails(g1Cfg, failMsg, true);
-
- igfsEndpointCfg.setPort(65536);
- g1IgfsCfg1.setIpcEndpointConfiguration(igfsEndpointCfg);
-
- checkGridStartFails(g1Cfg, failMsg, true);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/177ebd5c/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
index 57174ea..e5abfea 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsTaskSelfTest.java
@@ -137,7 +137,7 @@ public class IgfsTaskSelfTest extends IgfsCommonAbstractTest {
metaCacheCfg.setName("metaCache");
metaCacheCfg.setCacheMode(REPLICATED);
metaCacheCfg.setAtomicityMode(TRANSACTIONAL);
- dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+ metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
IgniteConfiguration cfg = new IgniteConfiguration();
[05/11] ignite git commit: IGNITE-3259: Delete worker is not started
on client nodes any more.
Posted by vo...@apache.org.
IGNITE-3259: Delete worker is not started on client nodes any more.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/33b0eb24
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/33b0eb24
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/33b0eb24
Branch: refs/heads/master
Commit: 33b0eb24dc7daebc13c07eb967e5b4ed7171ae55
Parents: 3cabdcf
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 18:10:36 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:49:36 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsMetaManager.java | 25 ++++++++++++++------
1 file changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/33b0eb24/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 1a88a88..e1a181d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -208,19 +208,20 @@ public class IgfsMetaManager extends IgfsManager {
locNode = igfsCtx.kernalContext().discovery().localNode();
// Start background delete worker.
- delWorker = new IgfsDeleteWorker(igfsCtx);
+ if (!client) {
+ delWorker = new IgfsDeleteWorker(igfsCtx);
- delWorker.start();
+ delWorker.start();
+ }
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
IgfsDeleteWorker delWorker0 = delWorker;
- if (delWorker0 != null)
+ if (delWorker0 != null) {
delWorker0.cancel();
- if (delWorker0 != null) {
try {
U.join(delWorker0);
}
@@ -1137,7 +1138,7 @@ public class IgfsMetaManager extends IgfsManager {
tx.commit();
- delWorker.signal();
+ signalDeleteWorker();
return newInfo.id();
}
@@ -1214,7 +1215,7 @@ public class IgfsMetaManager extends IgfsManager {
tx.commit();
- delWorker.signal();
+ signalDeleteWorker();
return victimId;
}
@@ -2524,7 +2525,7 @@ public class IgfsMetaManager extends IgfsManager {
Boolean res = synchronizeAndExecute(task, fs, false, Collections.singleton(trashId), path);
- delWorker.signal();
+ signalDeleteWorker();
return res;
}
@@ -3389,4 +3390,14 @@ public class IgfsMetaManager extends IgfsManager {
else
IgfsUtils.sendEvents(igfsCtx.kernalContext(), leafPath, EventType.EVT_IGFS_DIR_CREATED);
}
+
+ /**
+ * Signal delete worker thread.
+ */
+ private void signalDeleteWorker() {
+ IgfsDeleteWorker delWorker0 = delWorker;
+
+ if (delWorker0 != null)
+ delWorker0.signal();
+ }
}
\ No newline at end of file
[03/11] ignite git commit: IGNITE-3257: IGFS:
FileSystemConfiguration.DFLT_INIT_DFLT_PATH_MODES has been changed to
"false".
Posted by vo...@apache.org.
IGNITE-3257: IGFS: FileSystemConfiguration.DFLT_INIT_DFLT_PATH_MODES has been changed to "false".
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4046dc4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4046dc4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4046dc4
Branch: refs/heads/master
Commit: d4046dc46c0b6e4f122ef45147dd44c09e4c283c
Parents: 4804db9
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 15:50:13 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:49:02 2016 +0300
----------------------------------------------------------------------
.../org/apache/ignite/configuration/FileSystemConfiguration.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d4046dc4/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
index 75a9db0..fa15670 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/FileSystemConfiguration.java
@@ -83,7 +83,7 @@ public class FileSystemConfiguration {
public static final boolean DFLT_IPC_ENDPOINT_ENABLED = true;
/** Default value of whether to initialize default path modes. */
- public static final boolean DFLT_INIT_DFLT_PATH_MODES = true;
+ public static final boolean DFLT_INIT_DFLT_PATH_MODES = false;
/** Default value of metadata co-location flag. */
public static final boolean DFLT_COLOCATE_META = true;
[08/11] ignite git commit: IGNITE-3274: Hadoop: Fixed NPE in
BasicUserNameMapper.
Posted by vo...@apache.org.
IGNITE-3274: Hadoop: Fixed NPE in BasicUserNameMapper.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/42739504
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/42739504
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/42739504
Branch: refs/heads/master
Commit: 4273950458a9bb2f83d5fc0489da49aa1fa1dfaf
Parents: c300448
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 8 11:50:22 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:57:56 2016 +0300
----------------------------------------------------------------------
.../ignite/hadoop/util/BasicUserNameMapper.java | 4 +---
.../hadoop/util/BasicUserNameMapperSelfTest.java | 19 ++++++++++++++++++-
2 files changed, 19 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/42739504/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
index aea7196..c34808a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/util/BasicUserNameMapper.java
@@ -41,9 +41,7 @@ public class BasicUserNameMapper implements UserNameMapper {
/** {@inheritDoc} */
@Nullable @Override public String map(String name) {
- assert mappings != null;
-
- String res = mappings.get(name);
+ String res = mappings != null ? mappings.get(name) : null;
return res != null ? res : useDfltUsrName ? dfltUsrName : name;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/42739504/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
index 54b03f9..fd8fdef 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/hadoop/util/BasicUserNameMapperSelfTest.java
@@ -28,13 +28,30 @@ import java.util.Map;
*/
public class BasicUserNameMapperSelfTest extends GridCommonAbstractTest {
/**
+ * Test null mappings.
+ *
+ * @throws Exception If failed.
+ */
+ public void testNullMappings() throws Exception {
+ checkNullOrEmptyMappings(null);
+ }
+
+ /**
* Test empty mappings.
*
* @throws Exception If failed.
*/
public void testEmptyMappings() throws Exception {
- Map<String, String> map = new HashMap<>();
+ checkNullOrEmptyMappings(new HashMap<String, String>());
+ }
+ /**
+ * Check null or empty mappings.
+ *
+ * @param map Mappings.
+ * @throws Exception If failed.
+ */
+ private void checkNullOrEmptyMappings(@Nullable Map<String, String> map) throws Exception {
BasicUserNameMapper mapper = create(map, false, null);
assertNull(mapper.map(null));
[06/11] ignite git commit: Merge remote-tracking branch
'upstream/gridgain-7.6.1' into gridgain-7.6.1
Posted by vo...@apache.org.
Merge remote-tracking branch 'upstream/gridgain-7.6.1' into gridgain-7.6.1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a60bb3b6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a60bb3b6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a60bb3b6
Branch: refs/heads/master
Commit: a60bb3b658bb6d4688e07e0e4d07aff6a976c6a3
Parents: 33b0eb2 4e82af8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 8 14:54:37 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:54:37 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../GridCachePartitionExchangeManager.java | 43 ++++++-
.../distributed/GridCacheTxRecoveryFuture.java | 58 ++++++---
.../GridDhtPartitionsExchangeFuture.java | 42 +++++-
.../cache/transactions/IgniteTxManager.java | 9 +-
.../IgniteCacheMessageWriteTimeoutTest.java | 129 +++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
7 files changed, 253 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
[04/11] ignite git commit: IGNITE-3258: IGFS: Secondary file system
input stream is opened only when it is really needed.
Posted by vo...@apache.org.
IGNITE-3258: IGFS: Secondary file system input stream is opened only when it is really needed.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3cabdcf0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3cabdcf0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3cabdcf0
Branch: refs/heads/master
Commit: 3cabdcf026fd528fdae305ec2ed832d6ad603cb0
Parents: d4046dc
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 17:50:58 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:49:20 2016 +0300
----------------------------------------------------------------------
...zySecondaryFileSystemPositionedReadable.java | 77 ++++++++++++++++++++
.../processors/igfs/IgfsMetaManager.java | 18 ++++-
.../processors/igfs/IgfsAbstractSelfTest.java | 3 +
.../processors/igfs/IgfsModesSelfTest.java | 1 +
.../igfs/HadoopFIleSystemFactorySelfTest.java | 1 +
5 files changed, 98 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cabdcf0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLazySecondaryFileSystemPositionedReadable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLazySecondaryFileSystemPositionedReadable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLazySecondaryFileSystemPositionedReadable.java
new file mode 100644
index 0000000..0a57c34
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsLazySecondaryFileSystemPositionedReadable.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadable;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.io.IOException;
+
+/**
+ * Lazy readable entity which is opened on demand.
+ */
+public class IgfsLazySecondaryFileSystemPositionedReadable implements IgfsSecondaryFileSystemPositionedReadable {
+ /** File system. */
+ private final IgfsSecondaryFileSystem fs;
+
+ /** Path. */
+ private final IgfsPath path;
+
+ /** Buffer size. */
+ private final int bufSize;
+
+ /** Target stream. */
+ private IgfsSecondaryFileSystemPositionedReadable target;
+
+ /**
+ * Constructor.
+ *
+ * @param fs File system.
+ * @param path Path.
+ * @param bufSize Buffer size.
+ */
+ public IgfsLazySecondaryFileSystemPositionedReadable(IgfsSecondaryFileSystem fs, IgfsPath path, int bufSize) {
+ assert fs != null;
+ assert path != null;
+
+ this.fs = fs;
+ this.path = path;
+ this.bufSize = bufSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long pos, byte[] buf, int off, int len) throws IOException {
+ if (target == null)
+ target = fs.open(path, bufSize);
+
+ return target.read(pos, buf, off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (target != null)
+ target.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgfsLazySecondaryFileSystemPositionedReadable.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cabdcf0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index c3783b0..1a88a88 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -2192,7 +2192,7 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException(new IgfsPathIsDirectoryException("Failed to open file (not a file): " +
path));
- return new IgfsSecondaryInputStreamDescriptor(info, fs.open(path, bufSize));
+ return new IgfsSecondaryInputStreamDescriptor(info, lazySecondaryReader(fs, path, bufSize));
}
// If failed, try synchronize.
@@ -2208,7 +2208,8 @@ public class IgfsMetaManager extends IgfsManager {
throw fsException(new IgfsPathIsDirectoryException("Failed to open file " +
"(not a file): " + path));
- return new IgfsSecondaryInputStreamDescriptor(infos.get(path), fs.open(path, bufSize));
+ return new IgfsSecondaryInputStreamDescriptor(infos.get(path),
+ lazySecondaryReader(fs, path, bufSize));
}
@Override public IgfsSecondaryInputStreamDescriptor onFailure(@Nullable Exception err)
@@ -2232,6 +2233,19 @@ public class IgfsMetaManager extends IgfsManager {
}
/**
+ * Create lazy secondary file system reader.
+ *
+ * @param fs File system.
+ * @param path Path.
+ * @param bufSize Buffer size.
+ * @return Lazy reader.
+ */
+ private static IgfsLazySecondaryFileSystemPositionedReadable lazySecondaryReader(IgfsSecondaryFileSystem fs,
+ IgfsPath path, int bufSize) {
+ return new IgfsLazySecondaryFileSystemPositionedReadable(fs, path, bufSize);
+ }
+
+ /**
* Synchronizes with secondary file system.
*
* @param fs File system.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cabdcf0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 001868f..8a39de0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -1094,6 +1094,9 @@ public abstract class IgfsAbstractSelfTest extends IgfsCommonAbstractTest {
createFile(igfs.asSecondary(), FILE, true, chunk);
checkFileContent(igfs, FILE, chunk);
+
+ // Read again when the whole file is in memory.
+ checkFileContent(igfs, FILE, chunk);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cabdcf0/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java
index df537bc..1e54f8c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsModesSelfTest.java
@@ -102,6 +102,7 @@ public class IgfsModesSelfTest extends IgfsCommonAbstractTest {
igfsCfg.setMetaCacheName("replicated");
igfsCfg.setName("igfs");
igfsCfg.setBlockSize(512 * 1024);
+ igfsCfg.setInitializeDefaultPathModes(true);
if (setNullMode)
igfsCfg.setDefaultMode(null);
http://git-wip-us.apache.org/repos/asf/ignite/blob/3cabdcf0/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
index 1d02f0f..e4c64ff 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/igfs/HadoopFIleSystemFactorySelfTest.java
@@ -226,6 +226,7 @@ public class HadoopFIleSystemFactorySelfTest extends IgfsCommonAbstractTest {
igfsCfg.setDefaultMode(dfltMode);
igfsCfg.setIpcEndpointConfiguration(endpointCfg);
igfsCfg.setSecondaryFileSystem(secondaryFs);
+ igfsCfg.setInitializeDefaultPathModes(true);
CacheConfiguration dataCacheCfg = defaultCacheConfiguration();
[11/11] ignite git commit: Merge branch 'gridgain-7.6.1'
Posted by vo...@apache.org.
Merge branch 'gridgain-7.6.1'
# Conflicts:
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61210943
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61210943
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61210943
Branch: refs/heads/master
Commit: 61210943c1c627aa31dba6a80531a3fbac70b6dd
Parents: 77793f5 7761e5f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Jun 8 15:18:29 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 15:18:29 2016 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../configuration/FileSystemConfiguration.java | 36 +-
.../GridCachePartitionExchangeManager.java | 43 +-
.../distributed/GridCacheTxRecoveryFuture.java | 58 +-
.../GridDhtPartitionsExchangeFuture.java | 42 +-
.../cache/transactions/IgniteTxManager.java | 9 +-
.../internal/processors/igfs/IgfsAsyncImpl.java | 6 -
.../processors/igfs/IgfsDataManager.java | 89 ++-
.../processors/igfs/IgfsDeleteWorker.java | 36 --
.../ignite/internal/processors/igfs/IgfsEx.java | 9 -
.../internal/processors/igfs/IgfsImpl.java | 294 +++-------
.../processors/igfs/IgfsInputStreamImpl.java | 6 +-
...zySecondaryFileSystemPositionedReadable.java | 77 +++
.../processors/igfs/IgfsMetaManager.java | 303 +++++-----
.../igfs/IgfsOutputStreamAdapter.java | 265 ---------
.../processors/igfs/IgfsOutputStreamImpl.java | 558 +++++++++++--------
.../internal/processors/igfs/IgfsUtils.java | 2 +-
.../igfs/meta/IgfsMetaFileUnlockProcessor.java | 69 ++-
.../ignite/igfs/IgfsFragmentizerSelfTest.java | 2 -
.../IgniteCacheMessageWriteTimeoutTest.java | 129 +++++
.../processors/igfs/IgfsAbstractSelfTest.java | 15 +-
.../igfs/IgfsDataManagerSelfTest.java | 12 +-
.../processors/igfs/IgfsModesSelfTest.java | 1 +
.../igfs/IgfsProcessorValidationSelfTest.java | 38 +-
.../processors/igfs/IgfsSizeSelfTest.java | 133 -----
.../processors/igfs/IgfsTaskSelfTest.java | 2 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
.../ignite/hadoop/util/BasicUserNameMapper.java | 4 +-
.../util/BasicUserNameMapperSelfTest.java | 19 +-
.../igfs/HadoopFIleSystemFactorySelfTest.java | 1 +
.../HadoopDefaultMapReducePlannerSelfTest.java | 6 -
31 files changed, 1115 insertions(+), 1154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/61210943/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
[02/11] ignite git commit: IGNITE-3256: IGFS: Removed "exists" check
during input stream close.
Posted by vo...@apache.org.
IGNITE-3256: IGFS: Removed "exists" check during input stream close.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4804db9c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4804db9c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4804db9c
Branch: refs/heads/master
Commit: 4804db9ca806ee7c743f5f54d6eac37163513759
Parents: 4799b26
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Jun 6 15:40:59 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Jun 8 14:48:47 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/processors/igfs/IgfsInputStreamImpl.java | 6 +-----
1 file changed, 1 insertion(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4804db9c/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
index de7071a..ca2f9f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsInputStreamImpl.java
@@ -317,13 +317,9 @@ public class IgfsInputStreamImpl extends IgfsInputStreamAdapter {
pendingFutsLock.unlock();
}
}
-
- // Safety to ensure no orphaned data blocks exist in case file was concurrently deleted.
- if (!meta.exists(fileInfo.id()))
- data.delete(fileInfo);
}
}
- catch (IgniteCheckedException e) {
+ catch (Exception e) {
throw new IOException("File to close the file: " + path, e);
}
finally {