You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/20 12:22:27 UTC
[1/5] ignite git commit: ignite-2893 For datastructures use invoke
instead of explicit txs, got rid of unnecessary outTx usage.
Repository: ignite
Updated Branches:
refs/heads/ignite-1794 09e342708 -> 45c38bece
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
index 6724f78..e154850 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java
@@ -516,11 +516,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
/**
* This method is used for synchronizing the reentrant lock state across all nodes.
*/
- protected boolean compareAndSetGlobalState(final int expVal, final int newVal,
+ boolean compareAndSetGlobalState(final int expVal, final int newVal,
final Thread newThread, final boolean bargingProhibited) {
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
@@ -594,9 +593,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
throw e;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -608,12 +605,11 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
*
* @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered.
*/
- protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) {
+ boolean synchronizeQueue(final boolean cancelled, final Thread thread) {
final AtomicBoolean interrupted = new AtomicBoolean(false);
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
@@ -686,9 +682,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
throw e;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -704,13 +698,14 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
* Sets the global state across all nodes after releasing the reentrant lock.
*
* @param newVal New state.
- * @param lastCondition Id of the condition await is called.
+ * @param lastCond Id of the condition await is called.
* @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock.
*/
- protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map<String, Integer> outgoingSignals) {
+ protected boolean setGlobalState(final int newVal,
+ @Nullable final String lastCond,
+ final Map<String, Integer> outgoingSignals) {
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
@@ -730,9 +725,9 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
// If this lock is fair, remove this node from queue.
if (val.isFair() && newVal == 0) {
- UUID removedNode = val.getNodes().removeFirst();
+ UUID rmvdNode = val.getNodes().removeFirst();
- assert(thisNode.equals(removedNode));
+ assert(thisNode.equals(rmvdNode));
}
// Get global condition queue.
@@ -751,9 +746,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
if (list != null && !list.isEmpty()) {
// Check if signalAll was called.
- if (cnt == 0) {
+ if (cnt == 0)
cnt = list.size();
- }
// Remove from global condition queue.
for (int i = 0; i < cnt; i++) {
@@ -787,20 +781,20 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
// Check if this release is called after condition.await() call;
// If true, add this node to the global waiting queue.
- if (lastCondition != null) {
+ if (lastCond != null) {
LinkedList<UUID> queue;
//noinspection IfMayBeConditional
- if (!condMap.containsKey(lastCondition))
+ if (!condMap.containsKey(lastCond))
// New condition object.
queue = new LinkedList<>();
else
// Existing condition object.
- queue = condMap.get(lastCondition);
+ queue = condMap.get(lastCond);
queue.add(thisNode);
- condMap.put(lastCondition, queue);
+ condMap.put(lastCond, queue);
}
val.setConditionMap(condMap);
@@ -824,16 +818,14 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
throw e;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
}
}
- protected synchronized boolean checkIncomingSignals(GridCacheLockState state) {
+ synchronized boolean checkIncomingSignals(GridCacheLockState state) {
if (state.getSignals() == null)
return false;
@@ -882,16 +874,16 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
private final String name;
/** */
- private final AbstractQueuedSynchronizer.ConditionObject object;
+ private final AbstractQueuedSynchronizer.ConditionObject obj;
/**
* @param name Condition name.
- * @param object Condition object.
+ * @param obj Condition object.
*/
- protected IgniteConditionObject(String name, ConditionObject object) {
+ protected IgniteConditionObject(String name, ConditionObject obj) {
this.name = name;
- this.object = object;
+ this.obj = obj;
}
/**
@@ -913,7 +905,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
lastCondition = name;
- object.await();
+ obj.await();
sync.validate(true);
}
@@ -935,7 +927,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
lastCondition = name;
- object.awaitUninterruptibly();
+ obj.awaitUninterruptibly();
sync.validate(false);
}
@@ -954,7 +946,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
lastCondition = name;
- long result = object.awaitNanos(nanosTimeout);
+ long result = obj.awaitNanos(nanosTimeout);
sync.validate(true);
@@ -978,7 +970,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
lastCondition = name;
- boolean result = object.await(time, unit);
+ boolean result = obj.await(time, unit);
sync.validate(true);
@@ -1002,7 +994,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
lastCondition = name;
- boolean result = object.awaitUntil(deadline);
+ boolean result = obj.awaitUntil(deadline);
sync.validate(true);
@@ -1087,8 +1079,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
private void initializeReentrantLock() throws IgniteCheckedException {
if (initGuard.compareAndSet(false, true)) {
try {
- sync = CU.outTx(
- retryTopologySafe(new Callable<Sync>() {
+ sync = retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheLockState val = lockView.get(key);
@@ -1105,9 +1096,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
return new Sync(val);
}
}
- }),
- ctx
- );
+ });
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
@@ -1138,7 +1127,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
return;
// Check if update came from this node.
- boolean local = sync.isLockedLocally(val.getId());
+ boolean loc = sync.isLockedLocally(val.getId());
// Process any incoming signals.
boolean incomingSignals = sync.checkIncomingSignals(val);
@@ -1153,7 +1142,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
sync.setCurrentOwnerThread(val.getThreadId());
// Check if any threads waiting on this node need to be notified.
- if ((incomingSignals || sync.getPermits() == 0) && !local) {
+ if ((incomingSignals || sync.getPermits() == 0) && !loc) {
// Try to notify any waiting threads.
sync.release(0);
}
@@ -1171,9 +1160,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo
if (nodeId.equals(sync.getOwnerNode())) {
sync.setBroken(true);
- if (!sync.failoverSafe) {
+ if (!sync.failoverSafe)
sync.interruptAll();
- }
}
// Try to notify any waiting threads.
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
index e38c772..b31a154 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java
@@ -25,14 +25,12 @@ import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteQueue;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheGateway;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
@@ -93,18 +91,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.add(item);
- }
- }, cctx);
-
return delegate.add(item);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -115,18 +103,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.offer(item);
- }
- }, cctx);
-
return delegate.offer(item);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -137,18 +115,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.addAll(items);
- }
- }, cctx);
-
return delegate.addAll(items);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -160,18 +128,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.contains(item);
- }
- }, cctx);
-
return delegate.contains(item);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -182,18 +140,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.containsAll(items);
- }
- }, cctx);
-
return delegate.containsAll(items);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -204,20 +152,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.clear();
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.clear();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.clear();
}
finally {
gate.leave();
@@ -230,18 +165,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.remove(item);
- }
- }, cctx);
-
return delegate.remove(item);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -252,18 +177,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.removeAll(items);
- }
- }, cctx);
-
return delegate.removeAll(items);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -274,18 +189,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.isEmpty();
- }
- }, cctx);
-
return delegate.isEmpty();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -296,18 +201,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Iterator<T>>() {
- @Override public Iterator<T> call() throws Exception {
- return delegate.iterator();
- }
- }, cctx);
-
return delegate.iterator();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -318,18 +213,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Object[]>() {
- @Override public Object[] call() throws Exception {
- return delegate.toArray();
- }
- }, cctx);
-
return delegate.toArray();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -341,18 +226,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T1[]>() {
- @Override public T1[] call() throws Exception {
- return delegate.toArray(a);
- }
- }, cctx);
-
return delegate.toArray(a);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -363,18 +238,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.retainAll(items);
- }
- }, cctx);
-
return delegate.retainAll(items);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -385,18 +250,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return delegate.size();
- }
- }, cctx);
-
return delegate.size();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -407,18 +262,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.poll();
- }
- }, cctx);
-
return delegate.poll();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -429,18 +274,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.peek();
- }
- }, cctx);
-
return delegate.peek();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -451,20 +286,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.clear(batchSize);
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.clear(batchSize);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.clear(batchSize);
}
finally {
gate.leave();
@@ -476,18 +298,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return delegate.remainingCapacity();
- }
- }, cctx);
-
return delegate.remainingCapacity();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -498,18 +310,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return delegate.drainTo(c);
- }
- }, cctx);
-
return delegate.drainTo(c);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -520,18 +322,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return delegate.drainTo(c, maxElements);
- }
- }, cctx);
-
return delegate.drainTo(c, maxElements);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -542,18 +334,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.remove();
- }
- }, cctx);
-
return delegate.remove();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -564,18 +346,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.element();
- }
- }, cctx);
-
return delegate.element();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -586,20 +358,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.put(item);
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.put(item);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.put(item);
}
finally {
gate.leave();
@@ -611,18 +370,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.offer(item, timeout, unit);
- }
- }, cctx);
-
return delegate.offer(item, timeout, unit);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -633,18 +382,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.take();
- }
- }, cctx);
-
return delegate.take();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -655,18 +394,8 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T>() {
- @Override public T call() throws Exception {
- return delegate.poll(timeout, unit);
- }
- }, cctx);
-
return delegate.poll(timeout, unit);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -677,20 +406,7 @@ public class GridCacheQueueProxy<T> implements IgniteQueue<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.close();
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.close();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.close();
}
finally {
gate.leave();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index 0039fa2..edc322e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -109,6 +109,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
* Synchronization implementation for semaphore. Uses AQS state to represent permits.
*/
final class Sync extends AbstractQueuedSynchronizer {
+ /** */
private static final long serialVersionUID = 1192457210091910933L;
/** Map containing number of acquired permits for each node waiting on this semaphore. */
@@ -132,7 +133,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
*
* @param nodeMap NodeMap.
*/
- protected synchronized void setWaiters(Map<UUID, Integer> nodeMap) {
+ synchronized void setWaiters(Map<UUID, Integer> nodeMap) {
this.nodeMap = nodeMap;
}
@@ -141,7 +142,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
*
* @return Number of nodes waiting at this semaphore.
*/
- public int getWaiters() {
+ int getWaiters() {
int totalWaiters = 0;
for (Integer i : nodeMap.values()) {
@@ -159,7 +160,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
* @return Number of permits node has acquired at this semaphore. Can be less than 0 if more permits were
* released than acquired on node.
*/
- public int getPermitsForNode(UUID nodeID) {
+ int getPermitsForNode(UUID nodeID) {
return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0;
}
@@ -220,9 +221,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
int remaining = available - acquires;
- if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) {
+ if (remaining < 0 || compareAndSetGlobalState(available, remaining, false))
return remaining;
- }
}
}
@@ -270,10 +270,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
if (broken)
return 1;
- int current = getState();
+ int curr = getState();
- if (current == 0 || compareAndSetGlobalState(current, 0, true))
- return current;
+ if (curr == 0 || compareAndSetGlobalState(curr, 0, true))
+ return curr;
}
}
@@ -285,10 +285,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
* @param draining True if used for draining the permits.
* @return True if this is the call that succeeded to change the global state.
*/
- protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) {
+ boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) {
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView,
@@ -343,9 +342,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
throw e;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
if (ctx.kernalContext().isStopping()) {
@@ -367,10 +364,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
* @param broken Flag indicating that this semaphore is broken.
* @return True if this is the call that succeeded to change the global state.
*/
- protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
+ boolean releaseFailedNode(final UUID nodeId, final boolean broken) {
try {
- return CU.outTx(
- retryTopologySafe(new Callable<Boolean>() {
+ return retryTopologySafe(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
try (
GridNearTxLocal tx = CU.txStartInternal(ctx,
@@ -434,9 +430,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
throw e;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
if (ctx.kernalContext().isStopping()) {
@@ -484,8 +478,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
private void initializeSemaphore() throws IgniteCheckedException {
if (!initGuard.get() && initGuard.compareAndSet(false, true)) {
try {
- sync = CU.outTx(
- retryTopologySafe(new Callable<Sync>() {
+ sync = retryTopologySafe(new Callable<Sync>() {
@Override public Sync call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx,
semView, PESSIMISTIC, REPEATABLE_READ)) {
@@ -513,9 +506,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
return sync;
}
}
- }),
- ctx
- );
+ });
if (log.isDebugEnabled())
log.debug("Initialized internal sync structure: " + sync);
@@ -730,8 +721,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
try {
initializeSemaphore();
- ret = CU.outTx(
- retryTopologySafe(new Callable<Integer>() {
+ ret = retryTopologySafe(new Callable<Integer>() {
@Override public Integer call() throws Exception {
try (
GridNearTxLocal tx = CU.txStartInternal(ctx,
@@ -749,9 +739,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
return cnt;
}
}
- }),
- ctx
- );
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -787,7 +775,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
try {
initializeSemaphore();
- boolean result = sync.nonfairTryAcquireShared(1) >= 0;
+ boolean res = sync.nonfairTryAcquireShared(1) >= 0;
if (isBroken()) {
Thread.interrupted(); // Clear interrupt flag.
@@ -795,7 +783,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
throw new InterruptedException();
}
- return result;
+ return res;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -815,7 +803,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
try {
initializeSemaphore();
- boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+ boolean res = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
if (isBroken()) {
Thread.interrupted(); // Clear interrupt flag.
@@ -823,7 +811,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit
throw new InterruptedException();
}
- return result;
+ return res;
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
index 0eb6307..3dfa71f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java
@@ -25,14 +25,12 @@ import java.io.ObjectOutput;
import java.io.ObjectStreamException;
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheGateway;
import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteCallable;
@@ -113,18 +111,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Integer>() {
- @Override public Integer call() throws Exception {
- return delegate.size();
- }
- }, cctx);
-
return delegate.size();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -142,18 +130,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.isEmpty();
- }
- }, cctx);
-
return delegate.isEmpty();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -171,18 +149,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.contains(o);
- }
- }, cctx);
-
return delegate.contains(o);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -200,18 +168,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Object[]>() {
- @Override public Object[] call() throws Exception {
- return delegate.toArray();
- }
- }, cctx);
-
return delegate.toArray();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -229,18 +187,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<T1[]>() {
- @Override public T1[] call() throws Exception {
- return delegate.toArray(a);
- }
- }, cctx);
-
return delegate.toArray(a);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -258,18 +206,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.add(t);
- }
- }, cctx);
-
return delegate.add(t);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -287,18 +225,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.remove(o);
- }
- }, cctx);
-
return delegate.remove(o);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -316,18 +244,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.containsAll(c);
- }
- }, cctx);
-
return delegate.containsAll(c);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -345,18 +263,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.addAll(c);
- }
- }, cctx);
-
return delegate.addAll(c);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -374,18 +282,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.retainAll(c);
- }
- }, cctx);
-
return delegate.retainAll(c);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -403,18 +301,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- return delegate.removeAll(c);
- }
- }, cctx);
-
return delegate.removeAll(c);
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -432,20 +320,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.clear();
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.clear();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.clear();
}
finally {
gate.leave();
@@ -464,18 +339,8 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional())
- return CU.outTx(new Callable<Iterator<T>>() {
- @Override public Iterator<T> call() throws Exception {
- return delegate.iterator();
- }
- }, cctx);
-
return delegate.iterator();
}
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
finally {
gate.leave();
}
@@ -490,20 +355,7 @@ public class GridCacheSetProxy<T> implements IgniteSet<T>, Externalizable {
gate.enter();
try {
- if (cctx.transactional()) {
- CU.outTx(new Callable<Void>() {
- @Override public Void call() throws Exception {
- delegate.close();
-
- return null;
- }
- }, cctx);
- }
- else
- delegate.close();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
+ delegate.close();
}
finally {
gate.leave();
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
index 846eb69..6a19281 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java
@@ -75,7 +75,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
return retVal;
}
}
- }).call();
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -120,7 +120,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
}
}
}
- }).call();
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -169,7 +169,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
return retVal;
}
}
- }).call();
+ });
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -202,7 +202,7 @@ public class GridTransactionalCacheQueueImpl<T> extends GridCacheQueueAdapter<T>
return null;
}
- }).call();
+ });
}
catch (RuntimeException e) {
throw e;
[2/5] ignite git commit: ignite-2893 For datastructures use invoke
instead of explicit txs, got rid of unnecessary outTx usage.
Posted by sb...@apache.org.
ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ee955df9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ee955df9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ee955df9
Branch: refs/heads/ignite-1794
Commit: ee955df9fb80737292aac5f7ad3c82f8f0d8ea8e
Parents: f440480
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 20 13:10:28 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 20 13:10:28 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +-
.../processors/cache/GridCacheUtils.java | 117 ++--
.../datastructures/DataStructuresProcessor.java | 61 +-
.../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++--------
.../GridCacheAtomicReferenceImpl.java | 276 ++++----
.../GridCacheAtomicSequenceImpl.java | 88 +--
.../GridCacheAtomicStampedImpl.java | 293 ++++-----
.../GridCacheCountDownLatchImpl.java | 56 +-
.../datastructures/GridCacheLockImpl.java | 80 +--
.../datastructures/GridCacheQueueProxy.java | 292 +--------
.../datastructures/GridCacheSemaphoreImpl.java | 56 +-
.../datastructures/GridCacheSetProxy.java | 152 +----
.../GridTransactionalCacheQueueImpl.java | 8 +-
13 files changed, 812 insertions(+), 1297 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index a3d4c81..5438163 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2451,7 +2451,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKey(key);
return syncOp(new SyncOp<EntryProcessorResult<T>>(true) {
- @Nullable @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
+ @Override public EntryProcessorResult<T> op(GridNearTxLocal tx)
throws IgniteCheckedException {
assert topVer == null || tx.implicit();
@@ -2489,7 +2489,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
validateCacheKeys(keys);
return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) {
- @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
+ @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx)
throws IgniteCheckedException {
Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys,
new C1<K, EntryProcessor<K, V, Object>>() {
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 5abb6de..df9c7c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -881,31 +881,6 @@ public class GridCacheUtils {
}
/**
- * Method executes any Callable out of scope of transaction.
- * If transaction started by this thread {@code cmd} will be executed in another thread.
- *
- * @param cmd Callable.
- * @param ctx Cache context.
- * @return T Callable result.
- * @throws IgniteCheckedException If execution failed.
- */
- public static <T> T outTx(Callable<T> cmd, GridCacheContext ctx) throws IgniteCheckedException {
- if (ctx.tm().inUserTx())
- return ctx.closures().callLocalSafe(cmd, false).get();
- else {
- try {
- return cmd.call();
- }
- catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
- throw e;
- }
- catch (Exception e) {
- throw new IgniteCheckedException(e);
- }
- }
- }
-
- /**
* @param val Value.
* @param skip Skip value flag.
* @return Value.
@@ -1604,56 +1579,58 @@ public class GridCacheUtils {
/**
* @param c Closure to retry.
- * @param <S> Closure type.
- * @return Wrapped closure.
- */
- public static <S> Callable<S> retryTopologySafe(final Callable<S> c ) {
- return new Callable<S>() {
- @Override public S call() throws Exception {
- IgniteCheckedException err = null;
-
- for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
- try {
- return c.call();
- }
- catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ * @throws IgniteCheckedException If failed.
+ * @return Closure result.
+ */
+ public static <S> S retryTopologySafe(final Callable<S> c) throws IgniteCheckedException {
+ IgniteCheckedException err = null;
+
+ for (int i = 0; i < GridCacheAdapter.MAX_RETRIES; i++) {
+ try {
+ return c.call();
+ }
+ catch (ClusterGroupEmptyCheckedException | ClusterTopologyServerNotFoundException e) {
+ throw e;
+ }
+ catch (TransactionRollbackException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ U.sleep(1);
+ }
+ catch (IgniteCheckedException e) {
+ if (i + 1 == GridCacheAdapter.MAX_RETRIES)
+ throw e;
+
+ if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
+
+ if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
+ ClusterTopologyServerNotFoundException)
throw e;
- }
- catch (TransactionRollbackException e) {
- if (i + 1 == GridCacheAdapter.MAX_RETRIES)
- throw e;
+ // IGNITE-1948: remove this check when the issue is fixed
+ if (topErr.retryReadyFuture() != null)
+ topErr.retryReadyFuture().get();
+ else
U.sleep(1);
- }
- catch (IgniteCheckedException e) {
- if (i + 1 == GridCacheAdapter.MAX_RETRIES)
- throw e;
-
- if (X.hasCause(e, ClusterTopologyCheckedException.class)) {
- ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class);
-
- if (topErr instanceof ClusterGroupEmptyCheckedException || topErr instanceof
- ClusterTopologyServerNotFoundException)
- throw e;
-
- // IGNITE-1948: remove this check when the issue is fixed
- if (topErr.retryReadyFuture() != null)
- topErr.retryReadyFuture().get();
- else
- U.sleep(1);
- }
- else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
- CachePartialUpdateCheckedException.class))
- U.sleep(1);
- else
- throw e;
- }
}
-
- // Should never happen.
- throw err;
+ else if (X.hasCause(e, IgniteTxRollbackCheckedException.class,
+ CachePartialUpdateCheckedException.class))
+ U.sleep(1);
+ else
+ throw e;
}
- };
+ catch (RuntimeException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ throw new IgniteCheckedException(e);
+ }
+ }
+
+ // Should never happen.
+ throw err;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 102db96..0a439dc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -476,7 +476,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Sequence name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeSequence(final String name) throws IgniteCheckedException {
+ final void removeSequence(final String name) throws IgniteCheckedException {
assert name != null;
awaitInitialization();
@@ -488,9 +488,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicSequenceValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -631,7 +629,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic long name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicLong(final String name) throws IgniteCheckedException {
+ final void removeAtomicLong(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -642,7 +640,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- removeInternal(new GridCacheInternalKeyImpl(name), GridCacheAtomicLongValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -790,7 +788,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic reference name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicReference(final String name) throws IgniteCheckedException {
+ final void removeAtomicReference(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -801,9 +799,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicReferenceValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -894,7 +890,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
* @param name Atomic stamped name.
* @throws IgniteCheckedException If removing failed.
*/
- public final void removeAtomicStamped(final String name) throws IgniteCheckedException {
+ final void removeAtomicStamped(final String name) throws IgniteCheckedException {
assert name != null;
assert dsCacheCtx != null;
@@ -905,9 +901,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
dsCacheCtx.gate().enter();
try {
- GridCacheInternal key = new GridCacheInternalKeyImpl(name);
-
- removeInternal(key, GridCacheAtomicStampedValue.class);
+ dsView.remove(new GridCacheInternalKeyImpl(name));
}
finally {
dsCacheCtx.gate().leave();
@@ -1516,43 +1510,6 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
}
/**
- * Remove internal entry by key from cache.
- *
- * @param key Internal entry key.
- * @param cls Class of object which will be removed. If cached object has different type exception will be thrown.
- * @return Method returns true if sequence has been removed and false if it's not cached.
- * @throws IgniteCheckedException If removing failed or class of object is different to expected class.
- */
- private <R> boolean removeInternal(final GridCacheInternal key, final Class<R> cls) throws IgniteCheckedException {
- return CU.outTx(
- new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(dsCacheCtx, dsView, PESSIMISTIC, REPEATABLE_READ)) {
- // Check correctness type of removable object.
- R val = cast(dsView.get(key), cls);
-
- if (val != null) {
- dsView.remove(key);
-
- tx.commit();
- }
- else
- tx.setRollbackOnly();
-
- return val != null;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to remove data structure: " + key, e);
-
- throw e;
- }
- }
- },
- dsCacheCtx
- );
- }
-
- /**
*
*/
static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
@@ -1769,7 +1726,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter implemen
*/
public static <R> R retry(IgniteLogger log, Callable<R> call) throws IgniteCheckedException {
try {
- return GridCacheUtils.retryTopologySafe(call).call();
+ return GridCacheUtils.retryTopologySafe(call);
}
catch (IgniteCheckedException e) {
throw e;
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
index be718cf..3f07151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicLongImpl.java
@@ -23,23 +23,20 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic long implementation.
*/
@@ -55,9 +52,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic long name. */
private String name;
@@ -76,126 +70,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get()}. */
- private final Callable<Long> getCall = new Callable<Long>() {
- @Override public Long call() throws Exception {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- return val.get();
- }
- };
-
- /** Callable for {@link #incrementAndGet()}. */
- private final Callable<Long> incAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() + 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to increment and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndIncrement()}. */
- private final Callable<Long> getAndIncCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal + 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and increment: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #decrementAndGet()}. */
- private final Callable<Long> decAndGetCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get() - 1;
-
- val.set(retVal);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
- /** Callable for {@link #getAndDecrement()}. */
- private final Callable<Long> getAndDecCall = retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
-
- long retVal = val.get();
-
- val.set(retVal - 1);
-
- atomicView.put(key, val);
-
- tx.commit();
-
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and decrement and get: " + this, e);
-
- throw e;
- }
- }
- });
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -211,8 +85,10 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
* @param atomicView Atomic projection.
* @param ctx CacheContext.
*/
- public GridCacheAtomicLongImpl(String name, GridCacheInternalKey key,
- IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView, GridCacheContext ctx) {
+ public GridCacheAtomicLongImpl(String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicLongValue> atomicView,
+ GridCacheContext ctx) {
assert key != null;
assert atomicView != null;
assert ctx != null;
@@ -222,8 +98,6 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -236,7 +110,12 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicLongValue val = atomicView.get(key);
+
+ if (val == null)
+ throw new IgniteException("Failed to find atomic long: " + name);
+
+ return val.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -248,7 +127,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try{
- return CU.outTx(incAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, IncrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -260,7 +146,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getAndIncCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndIncrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -272,7 +165,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalAddAndGet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new AddAndGetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -284,7 +184,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalGetAndAdd(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndAddProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -296,7 +203,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(decAndGetCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, DecrementAndGetProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -308,7 +222,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(getAndDecCall, ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, GetAndDecrementProcessor.INSTANCE);
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -320,7 +241,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalGetAndSet(l), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new GetAndSetProcessor(l));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -332,7 +260,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal) , ctx) == expVal;
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get() == expVal;
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -348,7 +283,14 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+ EntryProcessorResult<Long> res = atomicView.invoke(key, new CompareAndSetProcessor(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -422,193 +364,335 @@ public final class GridCacheAtomicLongImpl implements GridCacheAtomicLongEx, Ign
}
}
+ /** {@inheritDoc} */
+ @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ this.atomicView = kctx.cache().atomicsCache();
+ this.ctx = atomicView.context();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx.kernalContext());
+ out.writeUTF(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
+
+ t.set1((GridKernalContext)in.readObject());
+ t.set2(in.readUTF());
+ }
+
/**
- * Method returns callable for execution {@link #addAndGet(long)} operation in async and sync mode.
+ * Reconstructs object on unmarshalling.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
+ * @return Reconstructed object.
+ * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Callable<Long> internalAddAndGet(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ private Object readResolve() throws ObjectStreamException {
+ try {
+ IgniteBiTuple<GridKernalContext, String> t = stash.get();
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ }
+ finally {
+ stash.remove();
+ }
+ }
- long retVal = val.get() + l;
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GridCacheAtomicLongImpl.class, this);
+ }
- val.set(retVal);
+ /**
+ *
+ */
+ static class GetAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ GetAndSetProcessor(long newVal) {
+ this.newVal = newVal;
+ }
- atomicView.put(key, val);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- tx.commit();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to add and get: " + this, e);
+ long curVal = val.get();
- throw e;
- }
- }
- });
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndAdd(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndAdd(final long l) {
- return retryTopologySafe(new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
-
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ static class GetAndAddProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ GetAndAddProcessor(long delta) {
+ this.delta = delta;
+ }
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- val.set(retVal + l);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.put(key, val);
+ long curVal = val.get();
- tx.commit();
+ e.setValue(new GridCacheAtomicLongValue(curVal + delta));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and add: " + this, e);
+ return curVal;
+ }
- throw e;
- }
- }
- });
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndAddProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #getAndSet(long)} operation in async and sync mode.
*
- * @param l Value will be added to atomic long.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalGetAndSet(final long l) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class AddAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long delta;
+
+ /**
+ * @param delta Delta.
+ */
+ AddAndGetProcessor(long delta) {
+ this.delta = delta;
+ }
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- long retVal = val.get();
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- val.set(l);
+ long newVal = val.get() + delta;
- atomicView.put(key, val);
+ e.setValue(new GridCacheAtomicLongValue(newVal));
- tx.commit();
+ return newVal;
+ }
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to get and set: " + this, e);
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(AddAndGetProcessor.class, this);
+ }
+ }
- throw e;
- }
- }
- };
+ /**
+ *
+ */
+ static class CompareAndSetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final long expVal;
+
+ /** */
+ private final long newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ CompareAndSetProcessor(long expVal, long newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long curVal = val.get();
+
+ if (curVal == expVal)
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CompareAndSetProcessor.class, this);
+ }
}
/**
- * Method returns callable for execution {@link #compareAndSetAndGet(long, long)}
- * operation in async and sync mode.
*
- * @param expVal Expected atomic long value.
- * @param newVal New atomic long value.
- * @return Callable for execution in async and sync mode.
*/
- private Callable<Long> internalCompareAndSetAndGet(final long expVal, final long newVal) {
- return new Callable<Long>() {
- @Override public Long call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicLongValue val = atomicView.get(key);
+ static class GetAndIncrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- if (val == null)
- throw new IgniteCheckedException("Failed to find atomic long with given name: " + name);
+ /** */
+ private static final GetAndIncrementProcessor INSTANCE = new GetAndIncrementProcessor();
- long retVal = val.get();
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- if (retVal == expVal) {
- val.set(newVal);
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- atomicView.getAndPut(key, val);
+ long ret = val.get();
- tx.commit();
- }
+ e.setValue(new GridCacheAtomicLongValue(ret + 1));
- return retVal;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and set: " + this, e);
+ return ret;
+ }
- throw e;
- }
- }
- };
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndIncrementProcessor.class, this);
+ }
}
- /** {@inheritDoc} */
- @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
- this.atomicView = kctx.cache().atomicsCache();
- this.ctx = atomicView.context();
- }
+ /**
+ *
+ */
+ static class IncrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- /** {@inheritDoc} */
- @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
+ /** */
+ private static final IncrementAndGetProcessor INSTANCE = new IncrementAndGetProcessor();
- }
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx.kernalContext());
- out.writeUTF(name);
- }
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ long newVal = val.get() + 1;
- t.set1((GridKernalContext)in.readObject());
- t.set2(in.readUTF());
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IncrementAndGetProcessor.class, this);
+ }
}
/**
- * Reconstructs object on unmarshalling.
*
- * @return Reconstructed object.
- * @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
- private Object readResolve() throws ObjectStreamException {
- try {
- IgniteBiTuple<GridKernalContext, String> t = stash.get();
+ static class GetAndDecrementProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
- return t.get1().dataStructures().atomicLong(t.get2(), 0L, false);
- }
- catch (IgniteCheckedException e) {
- throw U.withCause(new InvalidObjectException(e.getMessage()), e);
+ /** */
+ private static final GetAndDecrementProcessor INSTANCE = new GetAndDecrementProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long ret = val.get();
+
+ e.setValue(new GridCacheAtomicLongValue(ret - 1));
+
+ return ret;
}
- finally {
- stash.remove();
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(GetAndDecrementProcessor.class, this);
}
}
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridCacheAtomicLongImpl.class, this);
+ /**
+ *
+ */
+ static class DecrementAndGetProcessor implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicLongValue, Long> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private static final DecrementAndGetProcessor INSTANCE = new DecrementAndGetProcessor();
+
+ /** {@inheritDoc} */
+ @Override public Long process(MutableEntry<GridCacheInternalKey, GridCacheAtomicLongValue> e, Object... args) {
+ GridCacheAtomicLongValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic long: " + e.getKey().name());
+
+ long newVal = val.get() - 1;
+
+ e.setValue(new GridCacheAtomicLongValue(newVal));
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DecrementAndGetProcessor.class, this);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
index 4365468..b7dc007 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java
@@ -23,24 +23,21 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
-
/**
* Cache atomic reference implementation.
*/
@@ -56,9 +53,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic reference name. */
private String name;
@@ -77,18 +71,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get} operation */
- private final Callable<T> getCall = new Callable<T>() {
- @Override public T call() throws Exception {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- return ref.get();
- }
- };
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -117,8 +99,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -131,7 +111,12 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
+
+ if (ref == null)
+ throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
+
+ return ref.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -143,7 +128,10 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
checkRemoved();
try {
- CU.outTx(internalSet(val), ctx);
+ atomicView.invoke(key, new ReferenceSetEntryProcessor<>(val));
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -152,20 +140,42 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** {@inheritDoc} */
@Override public boolean compareAndSet(T expVal, T newVal) {
- return compareAndSetAndGet(newVal, expVal) == expVal;
+ try {
+ EntryProcessorResult<Boolean> res =
+ atomicView.invoke(key, new ReferenceCompareAndSetEntryProcessor<>(expVal, newVal));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
}
/**
* Compares current value with specified value for equality and, if they are equal, replaces current value.
*
* @param newVal New value to set.
+ * @param expVal Expected value.
* @return Original value.
*/
public T compareAndSetAndGet(T newVal, T expVal) {
checkRemoved();
try {
- return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx);
+ EntryProcessorResult<T> res =
+ atomicView.invoke(key, new ReferenceCompareAndSetAndGetEntryProcessor<T>(expVal, newVal));
+
+ assert res != null;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -205,82 +215,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
}
- /**
- * Method returns callable for execution {@link #set(Object)} operation in async and sync mode.
- *
- * @param val Value will be set in reference .
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalSet(final T val) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- ref.set(val);
-
- atomicView.put(key, ref);
-
- tx.commit();
-
- return true;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to set value [val=" + val + ", atomicReference=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
- /**
- * Conditionally sets the new value. It will be set if {@code expValPred} is
- * evaluate to {@code true}.
- *
- * @param expVal Expected value.
- * @param newVal New value.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) {
- return retryTopologySafe(new Callable<T>() {
- @Override public T call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicReferenceValue<T> ref = atomicView.get(key);
-
- if (ref == null)
- throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name);
-
- T origVal = ref.get();
-
- if (!F.eq(expVal, origVal)) {
- tx.setRollbackOnly();
-
- return origVal;
- }
- else {
- ref.set(newVal);
-
- atomicView.getAndPut(key, ref);
-
- tx.commit();
-
- return expVal;
- }
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" +
- newVal + ", atomicReference" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
this.atomicView = kctx.cache().atomicsCache();
@@ -289,7 +223,7 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
-
+ // No-op.
}
/**
@@ -363,6 +297,136 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef
}
}
+ /**
+ *
+ */
+ static class ReferenceSetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param newVal New value.
+ */
+ ReferenceSetEntryProcessor(T newVal) {
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ e.setValue(new GridCacheAtomicReferenceValue<>(newVal));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ReferenceCompareAndSetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ ReferenceCompareAndSetEntryProcessor(T expVal, T newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ T curVal = val.get();
+
+ if (F.eq(expVal, curVal)) {
+ e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceCompareAndSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class ReferenceCompareAndSetAndGetEntryProcessor<T> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>, T> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final T newVal;
+
+ /**
+ * @param expVal Expected value.
+ * @param newVal New value.
+ */
+ ReferenceCompareAndSetAndGetEntryProcessor(T expVal, T newVal) {
+ this.expVal = expVal;
+ this.newVal = newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public T process(MutableEntry<GridCacheInternalKey, GridCacheAtomicReferenceValue<T>> e,
+ Object... args) {
+ GridCacheAtomicReferenceValue<T> val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic reference with given name: " + e.getKey().name());
+
+ T curVal = val.get();
+
+ if (F.eq(expVal, curVal))
+ e.setValue(new GridCacheAtomicReferenceValue<T>(newVal));
+
+ return curVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ReferenceCompareAndSetAndGetEntryProcessor.class, this);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridCacheAtomicReferenceImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
index 0661b11..d14bb47 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java
@@ -32,11 +32,9 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -256,7 +254,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
if (updateGuard.compareAndSet(false, true)) {
try {
try {
- return updateCall.call();
+ return retryTopologySafe(updateCall);
}
catch (IgniteCheckedException | IgniteException | IllegalStateException e) {
throw e;
@@ -303,86 +301,6 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
}
}
- /**
- * Asynchronous sequence update operation. Will add given amount to the sequence value.
- *
- * @param l Increment amount.
- * @param updateCall Cache call that will update sequence reservation count in accordance with l.
- * @param updated If {@code true}, will return sequence value after update, otherwise will return sequence value
- * prior to update.
- * @return Future indicating sequence value.
- * @throws IgniteCheckedException If update failed.
- */
- @SuppressWarnings("SignalWithoutCorrespondingAwait")
- private IgniteInternalFuture<Long> internalUpdateAsync(long l, @Nullable Callable<Long> updateCall, boolean updated)
- throws IgniteCheckedException {
- checkRemoved();
-
- A.ensure(l > 0, " Parameter mustn't be less then 1: " + l);
-
- lock.lock();
-
- try {
- // If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
-
- locVal += l;
-
- return new GridFinishedFuture<>(updated ? locVal : curVal);
- }
- }
- finally {
- lock.unlock();
- }
-
- if (updateCall == null)
- updateCall = internalUpdate(l, updated);
-
- while (true) {
- if (updateGuard.compareAndSet(false, true)) {
- try {
- // This call must be outside lock.
- return ctx.closures().callLocalSafe(updateCall, true);
- }
- finally {
- lock.lock();
-
- try {
- updateGuard.set(false);
-
- cond.signalAll();
- }
- finally {
- lock.unlock();
- }
- }
- }
- else {
- lock.lock();
-
- try {
- while (locVal >= upBound && updateGuard.get())
- U.await(cond, 500, MILLISECONDS);
-
- checkRemoved();
-
- // If reserved range isn't exhausted.
- if (locVal + l <= upBound) {
- long curVal = locVal;
-
- locVal += l;
-
- return new GridFinishedFuture<>(updated ? locVal : curVal);
- }
- }
- finally {
- lock.unlock();
- }
- }
- }
- }
-
/** Get local batch size for this sequences.
*
* @return Sequence batch size.
@@ -485,7 +403,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
*/
@SuppressWarnings("TooBroadScope")
private Callable<Long> internalUpdate(final long l, final boolean updated) {
- return retryTopologySafe(new Callable<Long>() {
+ return new Callable<Long>() {
@Override public Long call() throws Exception {
try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) {
GridCacheAtomicSequenceValue seq = seqView.get(key);
@@ -556,7 +474,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc
throw e;
}
}
- });
+ };
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
index 09cea43..3f14942 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicStampedImpl.java
@@ -23,25 +23,20 @@ import java.io.InvalidObjectException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.ObjectStreamException;
-import java.util.concurrent.Callable;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
-import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
-import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.tostring.GridToStringBuilder;
-import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgnitePredicate;
-
-import static org.apache.ignite.internal.util.typedef.internal.CU.retryTopologySafe;
-import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
-import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
/**
* Cache atomic stamped implementation.
@@ -58,9 +53,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
};
- /** Logger. */
- private IgniteLogger log;
-
/** Atomic stamped name. */
private String name;
@@ -79,42 +71,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
/** Cache context. */
private GridCacheContext ctx;
- /** Callable for {@link #get()} operation */
- private final Callable<IgniteBiTuple<T, S>> getCall = retryTopologySafe(new Callable<IgniteBiTuple<T, S>>() {
- @Override public IgniteBiTuple<T, S> call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.get();
- }
- });
-
- /** Callable for {@link #value()} operation */
- private final Callable<T> valCall = retryTopologySafe(new Callable<T>() {
- @Override public T call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.value();
- }
- });
-
- /** Callable for {@link #stamp()} operation */
- private final Callable<S> stampCall = retryTopologySafe(new Callable<S>() {
- @Override public S call() throws Exception {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- return stmp.stamp();
- }
- });
-
/**
* Empty constructor required by {@link Externalizable}.
*/
@@ -130,8 +86,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
* @param atomicView Atomic projection.
* @param ctx Cache context.
*/
- public GridCacheAtomicStampedImpl(String name, GridCacheInternalKey key, IgniteInternalCache<GridCacheInternalKey,
- GridCacheAtomicStampedValue<T, S>> atomicView, GridCacheContext ctx) {
+ public GridCacheAtomicStampedImpl(String name,
+ GridCacheInternalKey key,
+ IgniteInternalCache<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> atomicView,
+ GridCacheContext ctx) {
assert key != null;
assert atomicView != null;
assert ctx != null;
@@ -141,8 +99,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
this.key = key;
this.atomicView = atomicView;
this.name = name;
-
- log = ctx.logger(getClass());
}
/** {@inheritDoc} */
@@ -155,7 +111,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(getCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -167,7 +128,10 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- CU.outTx(internalSet(val, stamp), ctx);
+ atomicView.invoke(key, new StampedSetEntryProcessor<>(val, stamp));
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -179,8 +143,15 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(internalCompareAndSet(F0.equalTo(expVal), wrapperClosure(newVal),
- F0.equalTo(expStamp), wrapperClosure(newStamp)), ctx);
+ EntryProcessorResult<Boolean> res =
+ atomicView.invoke(key, new StampedCompareAndSetEntryProcessor<>(expVal, expStamp, newVal, newStamp));
+
+ assert res != null && res.get() != null : res;
+
+ return res.get();
+ }
+ catch (EntryProcessorException e) {
+ throw new IgniteException(e.getMessage(), e);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -192,7 +163,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(stampCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.stamp();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -204,7 +180,12 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
checkRemoved();
try {
- return CU.outTx(valCall, ctx);
+ GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
+
+ if (stmp == null)
+ throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
+
+ return stmp.value();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -244,100 +225,6 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
}
}
- /**
- * Method make wrapper closure for existing value.
- *
- * @param val Value.
- * @return Closure.
- */
- private <N> IgniteClosure<N, N> wrapperClosure(final N val) {
- return new IgniteClosure<N, N>() {
- @Override public N apply(N e) {
- return val;
- }
- };
- }
-
- /**
- * Method returns callable for execution {@link #set(Object,Object)}} operation in async and sync mode.
- *
- * @param val Value will be set in the atomic stamped.
- * @param stamp Stamp will be set in the atomic stamped.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalSet(final T val, final S stamp) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- stmp.set(val, stamp);
-
- atomicView.put(key, stmp);
-
- tx.commit();
-
- return true;
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to set [val=" + val + ", stamp=" + stamp + ", atomicStamped=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
- /**
- * Conditionally asynchronously sets the new value and new stamp. They will be set if
- * {@code expValPred} and {@code expStampPred} both evaluate to {@code true}.
- *
- * @param expValPred Predicate which should evaluate to {@code true} for value to be set
- * @param newValClos Closure generates new value.
- * @param expStampPred Predicate which should evaluate to {@code true} for value to be set
- * @param newStampClos Closure generates new stamp value.
- * @return Callable for execution in async and sync mode.
- */
- private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred,
- final IgniteClosure<T, T> newValClos, final IgnitePredicate<S> expStampPred,
- final IgniteClosure<S, S> newStampClos) {
- return retryTopologySafe(new Callable<Boolean>() {
- @Override public Boolean call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheAtomicStampedValue<T, S> stmp = atomicView.get(key);
-
- if (stmp == null)
- throw new IgniteCheckedException("Failed to find atomic stamped with given name: " + name);
-
- if (!(expValPred.apply(stmp.value()) && expStampPred.apply(stmp.stamp()))) {
- tx.setRollbackOnly();
-
- return false;
- }
- else {
- stmp.set(newValClos.apply(stmp.value()), newStampClos.apply(stmp.stamp()));
-
- atomicView.getAndPut(key, stmp);
-
- tx.commit();
-
- return true;
- }
- }
- catch (Error | Exception e) {
- U.error(log, "Failed to compare and set [expValPred=" + expValPred + ", newValClos=" +
- newValClos + ", expStampPred=" + expStampPred + ", newStampClos=" + newStampClos +
- ", atomicStamped=" + this + ']', e);
-
- throw e;
- }
- }
- });
- }
-
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(ctx.kernalContext());
@@ -418,6 +305,104 @@ public final class GridCacheAtomicStampedImpl<T, S> implements GridCacheAtomicSt
return new IllegalStateException("Atomic stamped was removed from cache: " + name);
}
+ /**
+ *
+ */
+ static class StampedSetEntryProcessor<T, S> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Void> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T newVal;
+
+ /** */
+ private final S newStamp;
+
+ /**
+ * @param newVal New value.
+ * @param newStamp New stamp value.
+ */
+ StampedSetEntryProcessor(T newVal, S newStamp) {
+ this.newVal = newVal;
+ this.newStamp = newStamp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+ Object... args) {
+ GridCacheAtomicStampedValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+ e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(StampedSetEntryProcessor.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ static class StampedCompareAndSetEntryProcessor<T, S> implements
+ CacheEntryProcessor<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>, Boolean> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final T expVal;
+
+ /** */
+ private final S expStamp;
+
+ /** */
+ private final T newVal;
+
+ /** */
+ private final S newStamp;
+
+ /**
+ * @param expVal Expected value.
+ * @param expStamp Expected stamp.
+ * @param newVal New value.
+ * @param newStamp New stamp value.
+ */
+ StampedCompareAndSetEntryProcessor(T expVal, S expStamp, T newVal, S newStamp) {
+ this.expVal = expVal;
+ this.expStamp = expStamp;
+ this.newVal = newVal;
+ this.newStamp = newStamp;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Boolean process(MutableEntry<GridCacheInternalKey, GridCacheAtomicStampedValue<T, S>> e,
+ Object... args) {
+ GridCacheAtomicStampedValue val = e.getValue();
+
+ if (val == null)
+ throw new EntryProcessorException("Failed to find atomic stamped with given name: " + e.getKey().name());
+
+ if (F.eq(expVal, val.value()) && F.eq(expStamp, val.stamp())) {
+ e.setValue(new GridCacheAtomicStampedValue<>(newVal, newStamp));
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return GridToStringBuilder.toString(StampedCompareAndSetEntryProcessor.class, this);
+ }
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return GridToStringBuilder.toString(GridCacheAtomicStampedImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
index ea80cc5..86e99a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheCountDownLatchImpl.java
@@ -152,7 +152,9 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc} */
@Override public int count() {
try {
- return CU.outTx(new GetCountCallable(), ctx);
+ GridCacheCountDownLatchValue latchVal = latchView.get(key);
+
+ return latchVal == null ? 0 : latchVal.get();
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -208,7 +210,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
A.ensure(val > 0, "val should be positive");
try {
- return CU.outTx(retryTopologySafe(new CountDownCallable(val)), ctx);
+ return retryTopologySafe(new CountDownCallable(val));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -218,7 +220,7 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/** {@inheritDoc}*/
@Override public void countDownAll() {
try {
- CU.outTx(retryTopologySafe(new CountDownCallable(0)), ctx);
+ retryTopologySafe(new CountDownCallable(0));
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -255,23 +257,22 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
int state = initGuard.get();
if (state != READY_LATCH_STATE) {
- /** Internal latch is not fully initialized yet. Remember latest latch value. */
+ /* Internal latch is not fully initialized yet. Remember latest latch value. */
lastLatchVal = cnt;
return;
}
- /** 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
+ /* 'synchronized' statement guarantees visibility of internalLatch. No need to make it volatile. */
latch0 = internalLatch;
}
- /** Internal latch is fully initialized and ready for the usage. */
+ /* Internal latch is fully initialized and ready for the usage. */
assert latch0 != null;
while (latch0.getCount() > cnt)
latch0.countDown();
-
}
/**
@@ -280,27 +281,24 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
private void initializeLatch() throws IgniteCheckedException {
if (initGuard.compareAndSet(UNINITIALIZED_LATCH_STATE, CREATING_LATCH_STATE)) {
try {
- internalLatch = CU.outTx(
- retryTopologySafe(new Callable<CountDownLatch>() {
- @Override public CountDownLatch call() throws Exception {
- try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
- GridCacheCountDownLatchValue val = latchView.get(key);
+ internalLatch = retryTopologySafe(new Callable<CountDownLatch>() {
+ @Override public CountDownLatch call() throws Exception {
+ try (GridNearTxLocal tx = CU.txStartInternal(ctx, latchView, PESSIMISTIC, REPEATABLE_READ)) {
+ GridCacheCountDownLatchValue val = latchView.get(key);
- if (val == null) {
- if (log.isDebugEnabled())
- log.debug("Failed to find count down latch with given name: " + name);
+ if (val == null) {
+ if (log.isDebugEnabled())
+ log.debug("Failed to find count down latch with given name: " + name);
- return new CountDownLatch(0);
- }
+ return new CountDownLatch(0);
+ }
- tx.commit();
+ tx.commit();
- return new CountDownLatch(val.get());
- }
+ return new CountDownLatch(val.get());
}
- }),
- ctx
- );
+ }
+ });
synchronized (initGuard) {
if (lastLatchVal != null) {
@@ -392,18 +390,6 @@ public final class GridCacheCountDownLatchImpl implements GridCacheCountDownLatc
/**
*
*/
- private class GetCountCallable implements Callable<Integer> {
- /** {@inheritDoc} */
- @Override public Integer call() throws Exception {
- GridCacheCountDownLatchValue latchVal = latchView.get(key);
-
- return latchVal == null ? 0 : latchVal.get();
- }
- }
-
- /**
- *
- */
private class CountDownCallable implements Callable<Integer> {
/** Value to count down on (if 0 then latch is counted down to 0). */
private final int val;
[3/5] ignite git commit: IGNITE-3549: IGFS: Switched "accessTime" and
"modification" time in setTimes() method. This closes #1830.
Posted by sb...@apache.org.
IGNITE-3549: IGFS: Switched "accessTime" and "modification" time in setTimes() method. This closes #1830.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/457dcdbb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/457dcdbb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/457dcdbb
Branch: refs/heads/ignite-1794
Commit: 457dcdbba78bcbd63f50c5f83433c4935651fe17
Parents: f440480
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Apr 20 13:32:06 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Apr 20 13:32:06 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteFileSystem.java | 4 ++--
.../igfs/secondary/IgfsSecondaryFileSystem.java | 4 ++--
.../local/LocalIgfsSecondaryFileSystem.java | 2 +-
.../internal/processors/igfs/IgfsAsyncImpl.java | 4 ++--
.../ignite/internal/processors/igfs/IgfsImpl.java | 6 +++---
.../internal/processors/igfs/IgfsIpcHandler.java | 2 +-
.../internal/processors/igfs/IgfsMetaManager.java | 6 +++---
.../igfs/IgfsSecondaryFileSystemImpl.java | 4 ++--
.../igfs/client/IgfsClientSetTimesCallable.java | 2 +-
.../DefaultIgfsSecondaryFileSystemTestAdapter.java | 2 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 17 ++++++++---------
.../processors/igfs/IgfsDualAbstractSelfTest.java | 6 +++---
.../IgfsLocalSecondaryFileSystemTestAdapter.java | 2 +-
.../ignite/internal/processors/igfs/IgfsMock.java | 2 +-
.../IgfsSecondaryFileSystemInjectionSelfTest.java | 2 +-
.../igfs/IgfsSecondaryFileSystemTestAdapter.java | 2 +-
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 4 ++--
.../HadoopIgfsSecondaryFileSystemDelegateImpl.java | 2 +-
.../hadoop/impl/igfs/HadoopIgfsInProc.java | 2 +-
.../HadoopIgfsSecondaryFileSystemTestAdapter.java | 2 +-
20 files changed, 38 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java
index 9c3f9dd..d611871 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteFileSystem.java
@@ -202,13 +202,13 @@ public interface IgniteFileSystem extends IgniteAsyncSupport {
* corresponding time will not be changed.
*
* @param path Path to update.
- * @param accessTime Optional last access time to set. Value {@code -1} does not update access time.
* @param modificationTime Optional last modification time to set. Value {@code -1} does not update
* modification time.
+ * @param accessTime Optional last access time to set. Value {@code -1} does not update access time.
* @throws IgfsPathNotFoundException If target was not found.
* @throws IgniteException If error occurred.
*/
- public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException;
+ public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException;
/**
* Gets affinity block locations for data blocks of the file, i.e. the nodes, on which the blocks
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
index 76ba454..4de0cb5 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/IgfsSecondaryFileSystem.java
@@ -199,11 +199,11 @@ public interface IgfsSecondaryFileSystem {
* Set times for the given path.
*
* @param path Path.
- * @param accessTime Access time.
* @param modificationTime Modification time.
+ * @param accessTime Access time.
* @throws IgniteException If failed.
*/
- public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException;
+ public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException;
/**
* Get affinity block locations for data blocks of the file. In case {@code maxLen} parameter is set and
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
index 209c6d3..f5ddb1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
+++ b/modules/core/src/main/java/org/apache/ignite/igfs/secondary/local/LocalIgfsSecondaryFileSystem.java
@@ -395,7 +395,7 @@ public class LocalIgfsSecondaryFileSystem implements IgfsSecondaryFileSystem, Li
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
Path p = fileForPath(path).toPath();
if (!Files.exists(p))
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
index 4df6f59..812fd3b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsAsyncImpl.java
@@ -258,8 +258,8 @@ public class IgfsAsyncImpl extends AsyncSupportAdapter<IgniteFileSystem> impleme
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) {
- igfs.setTimes(path, accessTime, modificationTime);
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) {
+ igfs.setTimes(path, modificationTime, accessTime);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
index 2057c71..8712756 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java
@@ -1186,7 +1186,7 @@ public final class IgfsImpl implements IgfsEx {
}
/** {@inheritDoc} */
- @Override public void setTimes(final IgfsPath path, final long accessTime, final long modificationTime) {
+ @Override public void setTimes(final IgfsPath path, final long modificationTime, final long accessTime) {
A.notNull(path, "path");
if (accessTime == -1 && modificationTime == -1)
@@ -1204,9 +1204,9 @@ public final class IgfsImpl implements IgfsEx {
IgfsMode mode = resolveMode(path);
if (mode == PROXY)
- secondaryFs.setTimes(path, accessTime, modificationTime);
+ secondaryFs.setTimes(path, modificationTime, accessTime);
else {
- meta.updateTimes(path, accessTime, modificationTime,
+ meta.updateTimes(path, modificationTime, accessTime,
IgfsUtils.isDualMode(mode) ? secondaryFs : null);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
index 203f383..4e7801d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsIpcHandler.java
@@ -382,7 +382,7 @@ class IgfsIpcHandler implements IgfsServerHandler {
break;
case SET_TIMES:
- igfs.setTimes(req.path(), req.accessTime(), req.modificationTime());
+ igfs.setTimes(req.path(), req.modificationTime(), req.accessTime());
res.response(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
index 77272e1..a26239c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsMetaManager.java
@@ -2800,7 +2800,7 @@ public class IgfsMetaManager extends IgfsManager {
* @param secondaryFs Secondary file system.
* @throws IgniteCheckedException If failed.
*/
- public void updateTimes(IgfsPath path, long accessTime, long modificationTime,
+ public void updateTimes(IgfsPath path, long modificationTime, long accessTime,
IgfsSecondaryFileSystem secondaryFs) throws IgniteCheckedException {
while (true) {
if (busyLock.enterBusy()) {
@@ -2829,7 +2829,7 @@ public class IgfsMetaManager extends IgfsManager {
if (pathIds.allExists()) {
// All files are in place. Update both primary and secondary file systems.
if (secondaryFs != null)
- secondaryFs.setTimes(path, accessTime, modificationTime);
+ secondaryFs.setTimes(path, modificationTime, accessTime);
IgniteUuid targetId = pathIds.lastExistingId();
IgfsEntryInfo targetInfo = lockInfos.get(targetId);
@@ -2846,7 +2846,7 @@ public class IgfsMetaManager extends IgfsManager {
else {
// Propagate call to the secondary FS, as we might haven't cache this part yet.
if (secondaryFs != null) {
- secondaryFs.setTimes(path, accessTime, modificationTime);
+ secondaryFs.setTimes(path, modificationTime, accessTime);
return;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
index 1c135fe..332b03d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemImpl.java
@@ -119,8 +119,8 @@ class IgfsSecondaryFileSystemImpl implements IgfsSecondaryFileSystem {
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
- igfs.setTimes(path, accessTime, modificationTime);
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
+ igfs.setTimes(path, modificationTime, accessTime);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java
index 0914c3e..6a78f6d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientSetTimesCallable.java
@@ -65,7 +65,7 @@ public class IgfsClientSetTimesCallable extends IgfsClientAbstractCallable<Void>
/** {@inheritDoc} */
@Override protected Void call0(IgfsContext ctx) throws Exception {
- ctx.igfs().setTimes(path, accessTime, modificationTime);
+ ctx.igfs().setTimes(path, modificationTime, accessTime);
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java
index 93e0827..63a8016 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/DefaultIgfsSecondaryFileSystemTestAdapter.java
@@ -107,7 +107,7 @@ public class DefaultIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryF
if (info == null)
throw new IOException("Path not found: " + path);
- return new T2<>(info.accessTime(), info.modificationTime());
+ return new T2<>(info.modificationTime(), info.accessTime());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
index 6122e5c..2e8269c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsAbstractSelfTest.java
@@ -852,7 +852,7 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
}
// Change only access time.
- igfs.setTimes(path, info.accessTime() + 1000, -1);
+ igfs.setTimes(path, -1, info.accessTime() + 1000);
newInfo = igfs.info(path);
@@ -864,12 +864,12 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
if (dual) {
T2<Long, Long> newSecondaryTimes = igfsSecondary.times(path.toString());
- assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get1());
- assertEquals(secondaryTimes.get2(), newSecondaryTimes.get2());
+ assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get2());
+ assertEquals(secondaryTimes.get1(), newSecondaryTimes.get1());
}
// Change only modification time.
- igfs.setTimes(path, -1, info.modificationTime() + 1000);
+ igfs.setTimes(path, info.modificationTime() + 1000, -1);
newInfo = igfs.info(path);
@@ -881,12 +881,11 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
if (dual) {
T2<Long, Long> newSecondaryTimes = igfsSecondary.times(path.toString());
- assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get1());
- assertEquals(newInfo.modificationTime(), (long) newSecondaryTimes.get2());
+ assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get2());
}
// Change both.
- igfs.setTimes(path, info.accessTime() + 2000, info.modificationTime() + 2000);
+ igfs.setTimes(path, info.modificationTime() + 2000, info.accessTime() + 2000);
newInfo = igfs.info(path);
@@ -898,8 +897,8 @@ public abstract class IgfsAbstractSelfTest extends IgfsAbstractBaseSelfTest {
if (dual) {
T2<Long, Long> newSecondaryTimes = igfsSecondary.times(path.toString());
- assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get1());
- assertEquals(newInfo.modificationTime(), (long) newSecondaryTimes.get2());
+ assertEquals(newInfo.modificationTime(), (long) newSecondaryTimes.get1());
+ assertEquals(newInfo.accessTime(), (long) newSecondaryTimes.get2());
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
index 12f46da..bc3ef31 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsDualAbstractSelfTest.java
@@ -1562,7 +1562,7 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
final long MAX_ALIGN_ON_SECOND = (long)Integer.MAX_VALUE * 1000;
- igfs.setTimes(FILE, MAX_ALIGN_ON_SECOND - 1000, MAX_ALIGN_ON_SECOND);
+ igfs.setTimes(FILE, MAX_ALIGN_ON_SECOND, MAX_ALIGN_ON_SECOND - 1000);
IgfsFile info = igfs.info(FILE);
@@ -1573,8 +1573,8 @@ public abstract class IgfsDualAbstractSelfTest extends IgfsAbstractSelfTest {
T2<Long, Long> secondaryTimes = igfsSecondary.times(FILE.toString());
- assertEquals(info.accessTime(), (long) secondaryTimes.get1());
- assertEquals(info.modificationTime(), (long) secondaryTimes.get2());
+ assertEquals(info.modificationTime(), (long) secondaryTimes.get1());
+ assertEquals(info.accessTime(), (long) secondaryTimes.get2());
try {
igfs.setTimes(FILE2, MAX_ALIGN_ON_SECOND, MAX_ALIGN_ON_SECOND);
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemTestAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemTestAdapter.java
index 2b10f6e..905ca13 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemTestAdapter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsLocalSecondaryFileSystemTestAdapter.java
@@ -127,7 +127,7 @@ public class IgfsLocalSecondaryFileSystemTestAdapter implements IgfsSecondaryFil
BasicFileAttributes attrs = Files.getFileAttributeView(path(path), BasicFileAttributeView.class)
.readAttributes();
- return new T2<>(attrs.lastAccessTime().toMillis(), attrs.lastModifiedTime().toMillis());
+ return new T2<>(attrs.lastModifiedTime().toMillis(), attrs.lastAccessTime().toMillis());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
index 9a950ad..93bb802 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -214,7 +214,7 @@ public class IgfsMock implements IgfsEx {
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
throwUnsupported();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
index c5df409..e17c2df 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemInjectionSelfTest.java
@@ -248,7 +248,7 @@ public class IgfsSecondaryFileSystemInjectionSelfTest extends GridCommonAbstract
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
// No-op.
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemTestAdapter.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemTestAdapter.java
index fe913df..20afd67 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemTestAdapter.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsSecondaryFileSystemTestAdapter.java
@@ -104,7 +104,7 @@ public interface IgfsSecondaryFileSystemTestAdapter {
* Get times for path.
*
* @param path Path.
- * @return Times for path.
+ * @return Times for path (modification time, access time).
* @throws IOException If failed.
*/
T2<Long, Long> times(String path) throws IOException;
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
index 82f641f..6eca1c2 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java
@@ -246,8 +246,8 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
- target.setTimes(path, accessTime, modificationTime);
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
+ target.setTimes(path, modificationTime, accessTime);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
index 9c7febf..55d8521 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/delegate/HadoopIgfsSecondaryFileSystemDelegateImpl.java
@@ -376,7 +376,7 @@ public class HadoopIgfsSecondaryFileSystemDelegateImpl implements HadoopIgfsSeco
}
/** {@inheritDoc} */
- @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ @Override public void setTimes(IgfsPath path, long modificationTime, long accessTime) throws IgniteException {
try {
// We don't use FileSystem#getUsed() since it counts only the files
// in the filesystem root, not all the files recursively.
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
index 0577c73..e09e102 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsInProc.java
@@ -297,7 +297,7 @@ public class HadoopIgfsInProc implements HadoopIgfsEx {
try {
IgfsUserContext.doAs(user, new IgniteOutClosure<Void>() {
@Override public Void apply() {
- igfs.setTimes(path, accessTime, modificationTime);
+ igfs.setTimes(path, modificationTime, accessTime);
return null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/457dcdbb/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
index 453d0c7..06f1234 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/igfs/HadoopIgfsSecondaryFileSystemTestAdapter.java
@@ -132,7 +132,7 @@ public class HadoopIgfsSecondaryFileSystemTestAdapter implements IgfsSecondaryFi
@Override public T2<Long, Long> times(String path) throws IOException {
FileStatus status = get().getFileStatus(new Path(path));
- return new T2<>(status.getAccessTime(), status.getModificationTime());
+ return new T2<>(status.getModificationTime(), status.getAccessTime());
}
/** {@inheritDoc} */
[5/5] ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-2.0' into ignite-1794
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-2.0' into ignite-1794
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/45c38bec
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/45c38bec
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/45c38bec
Branch: refs/heads/ignite-1794
Commit: 45c38beceaa978f163f2b3b861fb7a556ed64b76
Parents: 09e34270 2334160
Author: sboikov <sb...@gridgain.com>
Authored: Thu Apr 20 15:22:08 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Apr 20 15:22:08 2017 +0300
----------------------------------------------------------------------
.../org/apache/ignite/IgniteFileSystem.java | 4 +-
.../igfs/secondary/IgfsSecondaryFileSystem.java | 4 +-
.../local/LocalIgfsSecondaryFileSystem.java | 2 +-
.../processors/cache/GridCacheAdapter.java | 4 +-
.../processors/cache/GridCacheUtils.java | 117 ++--
.../datastructures/DataStructuresProcessor.java | 61 +-
.../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++--------
.../GridCacheAtomicReferenceImpl.java | 276 ++++----
.../GridCacheAtomicSequenceImpl.java | 88 +--
.../GridCacheAtomicStampedImpl.java | 293 ++++-----
.../GridCacheCountDownLatchImpl.java | 56 +-
.../datastructures/GridCacheLockImpl.java | 80 +--
.../datastructures/GridCacheQueueProxy.java | 292 +--------
.../datastructures/GridCacheSemaphoreImpl.java | 56 +-
.../datastructures/GridCacheSetProxy.java | 152 +----
.../GridTransactionalCacheQueueImpl.java | 8 +-
.../internal/processors/igfs/IgfsAsyncImpl.java | 4 +-
.../internal/processors/igfs/IgfsImpl.java | 6 +-
.../processors/igfs/IgfsIpcHandler.java | 2 +-
.../processors/igfs/IgfsMetaManager.java | 6 +-
.../igfs/IgfsSecondaryFileSystemImpl.java | 4 +-
.../igfs/client/IgfsClientSetTimesCallable.java | 2 +-
...faultIgfsSecondaryFileSystemTestAdapter.java | 2 +-
.../processors/igfs/IgfsAbstractSelfTest.java | 17 +-
.../igfs/IgfsDualAbstractSelfTest.java | 6 +-
...IgfsLocalSecondaryFileSystemTestAdapter.java | 2 +-
.../internal/processors/igfs/IgfsMock.java | 2 +-
...gfsSecondaryFileSystemInjectionSelfTest.java | 2 +-
.../IgfsSecondaryFileSystemTestAdapter.java | 2 +-
.../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 4 +-
...doopIgfsSecondaryFileSystemDelegateImpl.java | 2 +-
.../hadoop/impl/igfs/HadoopIgfsInProc.java | 2 +-
...adoopIgfsSecondaryFileSystemTestAdapter.java | 2 +-
33 files changed, 850 insertions(+), 1336 deletions(-)
----------------------------------------------------------------------
[4/5] ignite git commit: Merge remote-tracking branch
'origin/ignite-2.0' into ignite-2.0
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-2.0' into ignite-2.0
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23341609
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23341609
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23341609
Branch: refs/heads/ignite-1794
Commit: 2334160919a77ea82a4d05f200ba7981e8035f44
Parents: 457dcdb ee955df
Author: devozerov <vo...@gridgain.com>
Authored: Thu Apr 20 13:32:25 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Apr 20 13:32:25 2017 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 4 +-
.../processors/cache/GridCacheUtils.java | 117 ++--
.../datastructures/DataStructuresProcessor.java | 61 +-
.../datastructures/GridCacheAtomicLongImpl.java | 626 +++++++++++--------
.../GridCacheAtomicReferenceImpl.java | 276 ++++----
.../GridCacheAtomicSequenceImpl.java | 88 +--
.../GridCacheAtomicStampedImpl.java | 293 ++++-----
.../GridCacheCountDownLatchImpl.java | 56 +-
.../datastructures/GridCacheLockImpl.java | 80 +--
.../datastructures/GridCacheQueueProxy.java | 292 +--------
.../datastructures/GridCacheSemaphoreImpl.java | 56 +-
.../datastructures/GridCacheSetProxy.java | 152 +----
.../GridTransactionalCacheQueueImpl.java | 8 +-
13 files changed, 812 insertions(+), 1297 deletions(-)
----------------------------------------------------------------------