You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:44 UTC

[08/27] ignite git commit: IGNITE-1299: Implemented IGFS file unlock with retries.

IGNITE-1299: Implemented IGFS file unlock with retries.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fe3e8fd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fe3e8fd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fe3e8fd

Branch: refs/heads/ignite-1124
Commit: 9fe3e8fd884f2f19bfe3fb39c9cda89c7ae495d8
Parents: e4ba2eb
Author: iveselovskiy <iv...@gridgain.com>
Authored: Thu Aug 27 12:11:13 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 12:11:13 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/IgniteInternalFuture.java   | 10 ++++
 .../query/GridCacheQueryFutureAdapter.java      |  7 +++
 .../processors/igfs/IgfsMetaManager.java        | 43 +++++++--------
 .../internal/processors/igfs/IgfsUtils.java     | 52 ++++++++++++++++++
 .../util/future/GridFinishedFuture.java         |  5 ++
 .../internal/util/future/GridFutureAdapter.java | 58 +++++++++++++-------
 6 files changed, 131 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 2b7b821..74cfb06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -69,6 +69,16 @@ public interface IgniteInternalFuture<R> {
     public R get(long timeout, TimeUnit unit) throws IgniteCheckedException;
 
     /**
+     * Synchronously waits for completion of the computation and returns computation result ignoring interrupts.
+     *
+     * @return Computation result.
+     * @throws IgniteFutureCancelledCheckedException Subclass of {@link IgniteCheckedException} throws if computation
+     *     was cancelled.
+     * @throws IgniteCheckedException If computation failed.
+     */
+    public R getUninterruptibly() throws IgniteCheckedException;
+
+    /**
      * Cancels this future.
      *
      * @return {@code True} if future was canceled (i.e. was not finished prior to this call).

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index 53017c9..ed5ad77 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -479,6 +479,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
         return super.get(timeout, unit);
     }
 
+    /** {@inheritDoc} */
+    @Override public Collection<R> getUninterruptibly() throws IgniteCheckedException {
+        if (!isDone())
+            loadAllPages();
+
+        return super.getUninterruptibly();
+    }
 
     /**
      * @param nodeId Sender node id.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index b98c5d8..aabe503 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -478,49 +478,46 @@ public class IgfsMetaManager extends IgfsManager {
      * @param modificationTime Modification time to write to file info.
      * @throws IgniteCheckedException If failed.
      */
-    public void unlock(IgfsFileInfo info, long modificationTime) throws IgniteCheckedException {
+    public void unlock(final IgfsFileInfo info, final long modificationTime) throws IgniteCheckedException {
         assert validTxState(false);
         assert info != null;
 
         if (busyLock.enterBusy()) {
             try {
-                IgniteUuid lockId = info.lockId();
+                final IgniteUuid lockId = info.lockId();
 
                 if (lockId == null)
                     return;
 
                 // Temporary clear interrupted state for unlocking.
-                boolean interrupted = Thread.interrupted();
-
-                IgniteUuid fileId = info.id();
-
-                IgniteInternalTx tx = metaCache.txStartEx(PESSIMISTIC, REPEATABLE_READ);
+                final boolean interrupted = Thread.interrupted();
 
                 try {
-                    // Lock file ID for this transaction.
-                    IgfsFileInfo oldInfo = info(fileId);
+                    IgfsUtils.doInTransactionWithRetries(metaCache, new IgniteOutClosureX<Void>() {
+                        @Override public Void applyx() throws IgniteCheckedException {
+                            IgniteUuid fileId = info.id();
 
-                    if (oldInfo == null)
-                        throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
+                            // Lock file ID for this transaction.
+                            IgfsFileInfo oldInfo = info(fileId);
 
-                    if (!info.lockId().equals(oldInfo.lockId()))
-                        throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
-                            ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
+                            if (oldInfo == null)
+                                throw fsException(new IgfsPathNotFoundException("Failed to unlock file (file not found): " + fileId));
 
-                    IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
+                            if (!info.lockId().equals(oldInfo.lockId()))
+                                throw new IgniteCheckedException("Failed to unlock file (inconsistent file lock ID) [fileId=" + fileId +
+                                    ", lockId=" + info.lockId() + ", actualLockId=" + oldInfo.lockId() + ']');
 
-                    boolean put = metaCache.put(fileId, newInfo);
+                            IgfsFileInfo newInfo = new IgfsFileInfo(oldInfo, null, modificationTime);
 
-                    assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+                            boolean put = metaCache.put(fileId, newInfo);
 
-                    tx.commit();
-                }
-                catch (GridClosureException e) {
-                    throw U.cast(e);
+                            assert put : "Value was not stored in cache [fileId=" + fileId + ", newInfo=" + newInfo + ']';
+
+                            return null;
+                        }
+                    });
                 }
                 finally {
-                    tx.close();
-
                     assert validTxState(false);
 
                     if (interrupted)

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
index 8026a44..7449f31 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsUtils.java
@@ -18,18 +18,31 @@
 package org.apache.ignite.internal.processors.igfs;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.processors.cache.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.lang.reflect.*;
 
+import static org.apache.ignite.IgniteSystemProperties.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
 /**
  * Common IGFS utility methods.
  */
 public class IgfsUtils {
+    /** Maximum number of file unlock transaction retries when topology changes. */
+    private static final int MAX_CACHE_TX_RETRIES = IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, 100);
+
     /**
      * Converts any passed exception to IGFS exception.
      *
@@ -104,4 +117,43 @@ public class IgfsUtils {
 
         return user;
     }
+
+    /**
+     * Performs an operation with transaction with retries.
+     *
+     * @param cache Cache to do the transaction on.
+     * @param clo Closure.
+     * @return Result of closure execution.
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public static <T> T doInTransactionWithRetries(IgniteInternalCache cache, IgniteOutClosureX<T> clo)
+        throws IgniteCheckedException {
+        assert cache != null;
+
+        int attempts = 0;
+
+        while (attempts < MAX_CACHE_TX_RETRIES) {
+            try (Transaction tx = cache.txStart(PESSIMISTIC, REPEATABLE_READ)) {
+                T res = clo.applyx();
+
+                tx.commit();
+
+                return res;
+            }
+            catch (IgniteException | IgniteCheckedException e) {
+                ClusterTopologyException cte = X.cause(e, ClusterTopologyException.class);
+
+                if (cte != null)
+                    ((IgniteFutureImpl)cte.retryReadyFuture()).internalFuture().getUninterruptibly();
+                else
+                    throw U.cast(e);
+            }
+
+            attempts++;
+        }
+
+        throw new IgniteCheckedException("Failed to perform operation since max number of attempts " +
+            "exceeded. [maxAttempts=" + MAX_CACHE_TX_RETRIES + ']');
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 242e626..2adee90 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -126,6 +126,11 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
     }
 
     /** {@inheritDoc} */
+    @Override public T getUninterruptibly() throws IgniteCheckedException {
+        return get();
+    }
+
+    /** {@inheritDoc} */
     @Override public void listen(IgniteInClosure<? super IgniteInternalFuture<T>> lsnr) {
         assert lsnr != null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9fe3e8fd/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index f8caf22..91ce549 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -107,6 +107,43 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
 
     /** {@inheritDoc} */
     @Override public R get() throws IgniteCheckedException {
+        return get0(ignoreInterrupts);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R getUninterruptibly() throws IgniteCheckedException {
+        return get0(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get(long timeout) throws IgniteCheckedException {
+        // Do not replace with static import, as it may not compile.
+        return get(timeout, TimeUnit.MILLISECONDS);
+    }
+
+    /** {@inheritDoc} */
+    @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
+        A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
+        A.notNull(unit, "unit");
+
+        try {
+            return get0(unit.toNanos(timeout));
+        }
+        catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+
+            throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
+        }
+    }
+
+    /**
+     * Internal get routine.
+     *
+     * @param ignoreInterrupts Whether to ignore interrupts.
+     * @return Result.
+     * @throws IgniteCheckedException If failed.
+     */
+    private R get0(boolean ignoreInterrupts) throws IgniteCheckedException {
         try {
             if (endTime == 0) {
                 if (ignoreInterrupts)
@@ -132,27 +169,6 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public R get(long timeout) throws IgniteCheckedException {
-        // Do not replace with static import, as it may not compile.
-        return get(timeout, TimeUnit.MILLISECONDS);
-    }
-
-    /** {@inheritDoc} */
-    @Override public R get(long timeout, TimeUnit unit) throws IgniteCheckedException {
-        A.ensure(timeout >= 0, "timeout cannot be negative: " + timeout);
-        A.notNull(unit, "unit");
-
-        try {
-            return get0(unit.toNanos(timeout));
-        }
-        catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-
-            throw new IgniteInterruptedCheckedException("Got interrupted while waiting for future to complete.", e);
-        }
-    }
-
     /**
      * @param nanosTimeout Timeout (nanoseconds).
      * @return Result.