You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:44 UTC
[08/27] ignite git commit: IGNITE-1299: Implemented IGFS file unlock
with retries.
IGNITE-1299: Implemented IGFS file unlock with retries.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fe3e8fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fe3e8fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fe3e8fd
Branch: refs/heads/ignite-1124
Commit: 9fe3e8fd884f2f19bfe3fb39c9cda89c7ae495d8
Parents: e4ba2eb
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Aug 27 12:11:13 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:11:13 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/IgniteInternalFuture.java | 10 ++++
.../query/GridCacheQueryFutureAdapter.java | 7 +++
.../processors/igfs/IgfsMetaManager.java | 43 +++++++--------
.../internal/processors/igfs/IgfsUtils.java | 52 ++++++++++++++++++
.../util/future/GridFinishedFuture.java | 5 ++
.../internal/util/future/GridFutureAdapter.java | 58 +++++++++++++-------
6 files changed, 131 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 2b7b821..74cfb06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -69,6 +69,16 @@ public interface IgniteInternalFuture<R> {
public R get(long timeout, TimeUnit unit) throws IgniteCheckedException;
/**
+ * Synchronously waits for completion of the computation and returns computation result ignoring interrupts.
+ *
+ * @return Computation result.
+ * @throws IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if computation
+ * was cancelled.
+ * @throws IgniteCheckedException If computation failed.
+ */
+ public R getUninterruptibly() throws IgniteCheckedException;
+
+ /**
* Cancels this future.
*
* @return {@code True} if future was canceled (i.e. was not finished prior to this call).
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 53017c9..ed5ad77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -479,6 +479,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
return super.get(timeout, unit);
}
+ /** {@inheritDoc} */
+ @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
+ if (!isDone())
+ loadAllPages();
+
+ return super.getUninterruptibly();
+ }
/**
* @param nodeId Sender node id.
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index b98c5d8..aabe503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -478,49 +478,46 @@ public class IgfsMetaManager extends IgfsManager {
* @param modificationTime Modification time to write to file info.
* @throws IgniteCheckedException If failed.
*/
- public void unlock(IgfsFileInfo info, long modificationTime) throws IgniteCheckedException {
+ public void unlock(final IgfsFileInfo info, final long modificationTime) throws IgniteCheckedException {
assert validTxState(false);
assert info != null;
if (busyLock.enterBusy()) {
try {
- IgniteUuid lockId = info.lockId();
+ final IgniteUuid lockId = info.lockId();
if (lockId == null)
return;
// Temporary clear interrupted state for unlocking.
- boolean interrupted = Thread.interrupted();
-
- IgniteUuid fileId = info.id();
-
- IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+ final boolean interrupted = Thread.interrupted();
try {
- // Lock file ID for this transaction.
- IgfsFileInfo oldInfo = info(fileId);
+ IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
+ @Override public Void applyx() throws IgniteCheckedException {
+ IgniteUuid fileId = info.id();
- if (oldInfo == null)
- throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
+ // Lock file ID for this transaction.
+ IgfsFileInfo oldInfo = info(fileId);
- if (!info.lockId().equals(oldInfo.lockId()))
- throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
- ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
+ if (oldInfo == null)
+ throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
- IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
+ if (!info.lockId().equals(oldInfo.lockId()))
+ throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
+ ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
- boolean put = metaCache.put(fileId, newInfo);
+ IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
- assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+ boolean put = metaCache.put(fileId, newInfo);
- tx.commit();
- }
- catch (GridClosureException e) {
- throw U.cast(e);
+ assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+
+ return null;
+ }
+ });
}
finally {
- tx.close();
-
assert validTxState(false);
if (interrupted)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 8026a44..7449f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,18 +18,31 @@
package org.apache.ignite.internal.processors.igfs;
import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.lang.reflect.*;
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
/**
* Common IGFS utility methods.
*/
public class IgfsUtils {
+ /** Maximum number of file unlock transaction retries when topology changes. */
+ private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
+
/**
* Converts any passed exception to IGFS exception.
*
@@ -104,4 +117,43 @@ public class IgfsUtils {
return user;
}
+
+ /**
+ * Performs an operation with transaction with retries.
+ *
+ * @param cache Cache to do the transaction on.
+ * @param clo Closure.
+ * @return Result of closure execution.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ public static <T> T doInTransactionWithRetries(IgniteInternalCache cache, IgniteOutClosureX<T> clo)
+ throws IgniteCheckedException {
+ assert cache != null;
+
+ int attempts = 0;
+
+ while (attempts < MAX_CACHE_TX_RETRIES) {
+ try (Transaction tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ T res = clo.applyx();
+
+ tx.commit();
+
+ return res;
+ }
+ catch (IgniteException | IgniteCheckedException e) {
+ ClusterTopologyException cte = X.cause(e, ClusterTopologyException.class);
+
+ if (cte != null)
+ ((IgniteFutureImpl)cte.retryReadyFuture()).internalFuture().getUninterruptibly();
+ else
+ throw U.cast(e);
+ }
+
+ attempts++;
+ }
+
+ throw new IgniteCheckedException("Failed to perform operation since max number of attempts " +
+ "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']');
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 242e626..2adee90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -126,6 +126,11 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
}
/** {@inheritDoc} */
+ @Override public T getUninterruptibly() throws IgniteCheckedException {
+ return get();
+ }
+
+ /** {@inheritDoc} */
@Override public void listen(IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) {
assert lsnr != null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f8caf22..91ce549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -107,6 +107,43 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public R get() throws IgniteCheckedException {
+ return get0(ignoreInterrupts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public R getUninterruptibly() throws IgniteCheckedException {
+ return get0(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public R get(long timeout) throws IgniteCheckedException {
+ // Do not replace with static import, as it may not compile.
+ return get(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+ A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
+ A.notNull(unit, "unit");
+
+ try {
+ return get0(unit.toNanos(timeout));
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
+ }
+ }
+
+ /**
+ * Internal get routine.
+ *
+ * @param ignoreInterrupts Whether to ignore interrupts.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
try {
if (endTime == 0) {
if (ignoreInterrupts)
@@ -132,27 +169,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
}
}
- /** {@inheritDoc} */
- @Override public R get(long timeout) throws IgniteCheckedException {
- // Do not replace with static import, as it may not compile.
- return get(timeout, TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
- A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
- A.notNull(unit, "unit");
-
- try {
- return get0(unit.toNanos(timeout));
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
-
- throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
- }
- }
-
/**
* @param nanosTimeout Timeout (nanoseconds).
* @return Result.