You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2017/04/13 12:42:57 UTC
[2/2] ignite git commit: ignite-4681 Apply new future adapter
ignite-4681 Apply new future adapter
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e922dda6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e922dda6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e922dda6
Branch: refs/heads/master
Commit: e922dda6e9230ff7715f83c7b81e5656e8e856a0
Parents: dd4a5c4
Author: Igor Seliverstov <gv...@gmail.com>
Authored: Thu Apr 13 15:41:54 2017 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Thu Apr 13 15:42:50 2017 +0300
----------------------------------------------------------------------
.../jmh/future/JmhFutureAdapterBenchmark.java | 145 ++++++
.../ignite/internal/IgniteInternalFuture.java | 15 -
.../cache/GridCacheCompoundFuture.java | 63 +++
.../cache/GridCacheCompoundIdentityFuture.java | 63 +++
.../processors/cache/GridCacheFuture.java | 15 +
.../cache/GridCacheFutureAdapter.java | 61 +++
.../distributed/GridCacheTxRecoveryFuture.java | 9 +-
.../dht/CacheDistributedGetFutureAdapter.java | 4 +-
.../distributed/dht/GridDhtLockFuture.java | 33 +-
.../distributed/dht/GridDhtTxFinishFuture.java | 4 +-
.../distributed/dht/GridDhtTxPrepareFuture.java | 16 +-
.../dht/GridPartitionedSingleGetFuture.java | 4 +-
.../GridDhtAtomicAbstractUpdateFuture.java | 4 +-
.../GridNearAtomicAbstractUpdateFuture.java | 8 +-
.../GridNearAtomicSingleUpdateFuture.java | 24 +-
.../dht/atomic/GridNearAtomicUpdateFuture.java | 24 +-
.../colocated/GridDhtColocatedLockFuture.java | 23 +-
.../GridDhtPartitionsExchangeFuture.java | 35 +-
.../distributed/near/GridNearLockFuture.java | 20 +-
...arOptimisticSerializableTxPrepareFuture.java | 2 +-
.../near/GridNearOptimisticTxPrepareFuture.java | 9 +-
.../GridNearPessimisticTxPrepareFuture.java | 5 +-
.../near/GridNearTxFinishFuture.java | 13 +-
.../cache/distributed/near/GridNearTxLocal.java | 2 +-
.../near/GridNearTxPrepareFutureAdapter.java | 4 +-
.../cache/local/GridLocalLockFuture.java | 6 +-
.../query/GridCacheDistributedQueryFuture.java | 18 +-
.../query/GridCacheQueryFutureAdapter.java | 31 +-
.../cache/transactions/TxDeadlockDetection.java | 5 +-
.../platform/compute/PlatformCompute.java | 10 -
.../tcp/GridTcpMemcachedNioListener.java | 20 +-
.../util/future/GridCompoundFuture.java | 45 +-
.../util/future/GridFinishedFuture.java | 13 -
.../internal/util/future/GridFutureAdapter.java | 479 ++++++++++---------
.../internal/util/future/IgniteFutureImpl.java | 10 -
.../org/apache/ignite/lang/IgniteFuture.java | 15 -
.../GridCacheOrderedPreloadingSelfTest.java | 48 +-
.../util/future/IgniteFutureImplTest.java | 38 --
.../external/HadoopExternalTaskExecutor.java | 2 +-
.../processors/schedule/ScheduleFutureImpl.java | 20 -
40 files changed, 810 insertions(+), 555 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
----------------------------------------------------------------------
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
new file mode 100644
index 0000000..ef3643a
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/future/JmhFutureAdapterBenchmark.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.future;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.benchmarks.jmh.JmhAbstractBenchmark;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+
+/**
+ *
+ */
+public class JmhFutureAdapterBenchmark extends JmhAbstractBenchmark {
+ /** */
+ private static final IgniteInClosure<IgniteInternalFuture<Long>> LSNR = new IgniteInClosure<IgniteInternalFuture<Long>>() {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteInternalFuture<Long> fut) {
+ // No-op
+ }
+ };
+
+ /** */
+ private static final Long RES = 0L;
+
+ /**
+ *
+ */
+ @State(Scope.Thread)
+ public static class CompleteState {
+ /** */
+ private final BlockingQueue<GridFutureAdapter<Long>> queue = new ArrayBlockingQueue<>(10);
+
+ /** */
+ private final Thread compleete = new Thread() {
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ while (!Thread.interrupted()) {
+ GridFutureAdapter<Long> fut = queue.poll();
+ if (fut != null)
+ fut.onDone(RES);
+ }
+ }
+ };
+
+ /**
+ *
+ */
+ @Setup public void setup() {
+ compleete.start();
+ }
+
+ /**
+ * @throws InterruptedException If failed.
+ */
+ @TearDown public void destroy() throws InterruptedException {
+ compleete.interrupt();
+ compleete.join();
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void testSimpleGet() throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ fut.onDone(RES);
+ fut.get();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void testSimpleGetWithListener() throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ fut.listen(LSNR);
+ fut.onDone(RES);
+ fut.get();
+ }
+
+ /**
+ * @param state Benchmark context.
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ @Threads(4)
+ public void completeFutureGet(CompleteState state) throws Exception {
+ GridFutureAdapter<Long> fut = new GridFutureAdapter<>();
+ state.queue.put(fut);
+ fut.get();
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Arguments.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ run(8);
+ }
+
+ /**
+ * Run benchmark.
+ *
+ * @param threads Amount of threads.
+ * @throws Exception If failed.
+ */
+ private static void run(int threads) throws Exception {
+ JmhIdeBenchmarkRunner.create()
+ .forks(1)
+ .threads(threads)
+ .warmupIterations(30)
+ .measurementIterations(30)
+ .benchmarks(JmhFutureAdapterBenchmark.class.getSimpleName())
+ .jvmArguments("-Xms4g", "-Xmx4g")
+ .run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index 789556d..76f8c71 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -103,21 +103,6 @@ public interface IgniteInternalFuture<R> {
public boolean isCancelled();
/**
- * Gets start time for this future.
- *
- * @return Start time for this future.
- */
- public long startTime();
-
- /**
- * Gets duration in milliseconds between start of the future and current time if future
- * is not finished, or between start and finish of this future.
- *
- * @return Time in milliseconds this future has taken to execute.
- */
- public long duration();
-
- /**
* Registers listener closure to be asynchronously notified whenever future completes.
*
* @param lsnr Listener closure to register. If not provided - this method is no-op.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
new file mode 100644
index 0000000..9869d4a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundFuture<T, R> extends GridCompoundFuture<T, R> implements GridCacheFuture<R> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * @param rdc Reducer.
+ */
+ protected GridCacheCompoundFuture(@Nullable IgniteReducer<T, R> rdc) {
+ super(rdc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
new file mode 100644
index 0000000..8fd619a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheCompoundIdentityFuture.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteReducer;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheCompoundIdentityFuture<T> extends GridCompoundIdentityFuture<T> implements GridCacheFuture<T> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * @param rdc Reducer.
+ */
+ protected GridCacheCompoundIdentityFuture(@Nullable IgniteReducer<T, T> rdc) {
+ super(rdc);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return (endTime == 0 ? U.currentTimeMillis() : endTime) - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
index 8bf8d40..90a219a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFuture.java
@@ -26,6 +26,21 @@ import org.apache.ignite.lang.IgniteUuid;
*/
public interface GridCacheFuture<R> extends IgniteInternalFuture<R> {
/**
+ * Gets start time for this future.
+ *
+ * @return Start time for this future.
+ */
+ public long startTime();
+
+ /**
+ * Gets duration in milliseconds between start of the future and current time if future
+ * is not finished, or between start and finish of this future.
+ *
+ * @return Time in milliseconds this future has taken to execute.
+ */
+ public long duration();
+
+ /**
* @return Unique identifier for this future.
*/
public IgniteUuid futureId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
new file mode 100644
index 0000000..babd707
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheFutureAdapter.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class GridCacheFutureAdapter<R> extends GridFutureAdapter<R> implements GridCacheFuture<R> {
+ /** Future start time. */
+ private final long startTime = U.currentTimeMillis();
+
+ /** Future end time. */
+ private volatile long endTime;
+
+ /**
+ * Default constructor.
+ */
+ public GridCacheFutureAdapter() {
+ }
+
+ /** {@inheritDoc} */
+ @Override public long startTime() {
+ return startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long duration() {
+ long endTime = this.endTime;
+
+ return endTime == 0 ? U.currentTimeMillis() - startTime : endTime - startTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean onDone(@Nullable R res, @Nullable Throwable err, boolean cancel) {
+ if(super.onDone(res, err, cancel)){
+ endTime = U.currentTimeMillis();
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
index e27f777..1c97de2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java
@@ -28,11 +28,11 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.util.GridLeanMap;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -49,7 +49,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
/**
* Future verifying that all remote transactions related to transaction were prepared or committed.
*/
-public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolean> implements GridCacheFuture<Boolean> {
+public class GridCacheTxRecoveryFuture extends GridCacheCompoundIdentityFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -426,7 +426,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -547,9 +547,6 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea
*
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Mini future ID. */
private final IgniteUuid futId = IgniteUuid.randomUuid();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
index 4381dfd..259b096 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java
@@ -25,11 +25,11 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.lang.IgniteUuid;
@@ -42,7 +42,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
*
*/
-public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
+public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCacheCompoundIdentityFuture<Map<K, V>>
implements GridCacheFuture<Map<K, V>>, CacheGetFuture {
/** Default max remap count value. */
public static final int DFLT_MAX_REMAP_CNT = 3;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index a0270b0..1a7c2c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.dr.GridDrType;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -77,7 +77,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
/**
* Cache lock future.
*/
-public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean>, GridDhtFuture<Boolean>, GridCacheMappedVersion {
/** */
private static final long serialVersionUID = 0L;
@@ -298,10 +298,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return Entries.
*/
- public Collection<GridDhtCacheEntry> entriesCopy() {
- synchronized (sync) {
- return new ArrayList<>(entries());
- }
+ public synchronized Collection<GridDhtCacheEntry> entriesCopy() {
+ return new ArrayList<>(entries());
}
/**
@@ -395,7 +393,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
return null;
}
- synchronized (sync) {
+ synchronized (this) {
entries.add(c == null || c.reentry() ? null : entry);
if (c != null && !c.reentry())
@@ -529,7 +527,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(IgniteUuid miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -599,7 +597,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param t Error.
*/
public void onError(Throwable t) {
- synchronized (sync) {
+ synchronized (this) {
if (err != null)
return;
@@ -646,7 +644,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]");
if (owner != null && owner.version().equals(lockVer)) {
- synchronized (sync) {
+ synchronized (this) {
if (!pendingLocks.remove(entry.key()))
return false;
}
@@ -663,10 +661,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
/**
* @return {@code True} if locks have been acquired.
*/
- private boolean checkLocks() {
- synchronized (sync) {
- return pendingLocks.isEmpty();
- }
+ private synchronized boolean checkLocks() {
+ return pendingLocks.isEmpty();
}
/** {@inheritDoc} */
@@ -697,7 +693,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (isDone() || (err == null && success && !checkLocks()))
return false;
- synchronized (sync) {
+ synchronized (this) {
if (this.err == null)
this.err = err;
}
@@ -776,7 +772,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
* @param entries Entries.
*/
private void map(Iterable<GridDhtCacheEntry> entries) {
- synchronized (sync) {
+ synchronized (this) {
if (mapped)
return;
@@ -1120,7 +1116,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
if (log.isDebugEnabled())
log.debug("Timed out waiting for lock response: " + this);
- synchronized (sync) {
+ synchronized (GridDhtLockFuture.this) {
timedOut = true;
// Stop locks and responses processing.
@@ -1146,9 +1142,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean>
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
index 17e9047..23d7657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java
@@ -28,12 +28,12 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -50,7 +50,7 @@ import static org.apache.ignite.transactions.TransactionState.COMMITTING;
/**
*
*/
-public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index 93ea30d..964d423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
import org.apache.ignite.internal.processors.cache.CacheLockCandidates;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
@@ -97,7 +98,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED;
*
*/
@SuppressWarnings("unchecked")
-public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
+public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<IgniteInternalTx, GridNearTxPrepareResponse>
implements GridCacheMvccFuture<GridNearTxPrepareResponse> {
/** */
private static final long serialVersionUID = 0L;
@@ -279,7 +280,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
boolean rmv;
- synchronized (sync) {
+ synchronized (this) {
rmv = lockKeys.remove(entry.txKey());
}
@@ -310,7 +311,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
if (!locksReady)
return false;
- synchronized (sync) {
+ synchronized (this) {
return lockKeys.isEmpty();
}
}
@@ -564,7 +565,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -623,7 +624,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
}
if (tx.optimistic() && txEntry.explicitVersion() == null) {
- synchronized (sync) {
+ synchronized (this) {
lockKeys.add(txEntry.txKey());
}
}
@@ -1597,9 +1598,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
*/
private class MiniFuture extends GridFutureAdapter<IgniteInternalTx> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** Node ID. */
@@ -1811,7 +1809,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter
/** {@inheritDoc} */
@Override public void onTimeout() {
- synchronized (sync) {
+ synchronized (GridDhtTxPrepareFuture.this) {
clear();
lockKeys.clear();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 47f4066..3af691c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -45,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
@@ -61,7 +61,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
/**
*
*/
-public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>,
+public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Object> implements GridCacheFuture<Object>,
CacheGetFuture {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 0940acb..039cb99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -36,12 +36,12 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
@@ -58,7 +58,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
/**
* DHT atomic cache backup update future.
*/
-public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapter<Void>
+public abstract class GridDhtAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Void>
implements GridCacheAtomicFuture<Void> {
/** */
private static final long serialVersionUID = 0L;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index a2adb05..122e17c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccManager;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
@@ -58,7 +59,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC
/**
* Base for near atomic update futures.
*/
-public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapter<Object>
+public abstract class GridNearAtomicAbstractUpdateFuture extends GridCacheFutureAdapter<Object>
implements GridCacheAtomicFuture<Object> {
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
@@ -114,9 +115,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Near cache flag. */
protected final boolean nearEnabled;
- /** Mutex to synchronize state updates. */
- protected final Object mux = new Object();
-
/** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
protected boolean topLocked;
@@ -138,7 +136,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
/** Future ID. */
@GridToStringInclude
- protected long futId;
+ protected volatile long futId;
/** Operation result. */
protected GridCacheReturn opRes;
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index e4ba457..11c3336 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -126,9 +126,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
/** {@inheritDoc} */
@Override public long id() {
- synchronized (mux) {
- return futId;
- }
+ return futId;
}
/** {@inheritDoc} */
@@ -141,7 +139,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
boolean rcvAll = false;
- synchronized (mux) {
+ synchronized (this) {
if (reqState == null)
return false;
@@ -215,7 +213,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
CachePartialUpdateCheckedException err0;
AffinityTopologyVersion remapTopVer0;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -257,7 +255,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -331,7 +329,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
* @return Non-null topology version if update should be remapped.
*/
private AffinityTopologyVersion onAllReceived() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -488,7 +486,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
try {
reqState0 = mapSingleUpdate(topVer, futId);
- synchronized (mux) {
+ synchronized (this) {
assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
@@ -537,7 +535,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
GridNearAtomicCheckUpdateRequest checkReq = null;
- synchronized (mux) {
+ synchronized (this) {
if (this.futId == 0 || this.futId != futId)
return;
@@ -568,7 +566,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
private Long onFutureDone() {
Long id0;
- synchronized (mux) {
+ synchronized (this) {
id0 = futId;
futId = 0;
@@ -734,9 +732,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
}
/** {@inheritDoc} */
- public String toString() {
- synchronized (mux) {
- return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
- }
+ public synchronized String toString() {
+ return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index 84deefc..6198de4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -151,9 +151,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
/** {@inheritDoc} */
@Override public long id() {
- synchronized (mux) {
- return futId;
- }
+ return futId;
}
/** {@inheritDoc} */
@@ -166,7 +164,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
List<GridNearAtomicCheckUpdateRequest> checkReqs = null;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0)
return false;
@@ -299,7 +297,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
CachePartialUpdateCheckedException err0;
AffinityTopologyVersion remapTopVer0;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -372,7 +370,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll;
- synchronized (mux) {
+ synchronized (this) {
if (futId == 0 || futId != res.futureId())
return;
@@ -534,7 +532,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
* @return Non null topology version if update should be remapped.
*/
@Nullable private AffinityTopologyVersion onAllReceived() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
assert futId > 0;
AffinityTopologyVersion remapTopVer0 = null;
@@ -801,7 +799,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
}
- synchronized (mux) {
+ synchronized (this) {
assert this.futId == 0 : this;
assert this.topVer == AffinityTopologyVersion.ZERO : this;
@@ -866,7 +864,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
boolean rcvAll = false;
- synchronized (mux) {
+ synchronized (this) {
if (this.futId == 0 || this.futId != futId)
return;
@@ -938,7 +936,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
private Long onFutureDone() {
Long id0;
- synchronized (mux) {
+ synchronized (this) {
id0 = futId;
futId = 0;
@@ -1181,9 +1179,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
}
/** {@inheritDoc} */
- public String toString() {
- synchronized (mux) {
- return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
- }
+ public synchronized String toString() {
+ return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
index 79c15fb..8512298 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -56,7 +57,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -82,7 +82,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Colocated cache lock future.
*/
-public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridDhtColocatedLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -203,7 +203,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
this.skipStore = skipStore;
this.keepBinary = keepBinary;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -452,13 +452,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
- public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
- if (timeoutObj != null && timeoutObj.requestedKeys != null)
- return timeoutObj.requestedKeys;
+ public synchronized Set<IgniteTxKey> requestedKeys() {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
- return requestedKeys0();
- }
+ return requestedKeys0();
}
/**
@@ -490,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -1341,7 +1339,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
log.debug("Timed out waiting for lock response: " + this);
if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (sync) {
+ synchronized (GridDhtColocatedLockFuture.this) {
requestedKeys = requestedKeys0();
clear(); // Stop response processing.
@@ -1390,9 +1388,6 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture
*/
private class MiniFuture extends GridFutureAdapter<Boolean> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** Node ID. */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f41da2b..55aca2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -171,9 +171,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
@GridToStringInclude
private volatile IgniteInternalFuture<?> partReleaseFut;
- /** */
- private final Object mux = new Object();
-
/** Logger. */
private IgniteLogger log;
@@ -1087,7 +1084,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
if (super.onDone(res, err) && realExchange) {
if (log.isDebugEnabled())
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
- "duration=" + duration() + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
+ ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
initFut.onDone(err == null);
@@ -1201,7 +1198,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
boolean allReceived = false;
boolean updateSingleMap = false;
- synchronized (mux) {
+ synchronized (this) {
assert crd != null;
if (crd.isLocal()) {
@@ -1222,13 +1219,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
updatePartitionSingleMap(msg);
}
finally {
- synchronized (mux) {
+ synchronized (this) {
assert pendingSingleUpdates > 0;
pendingSingleUpdates--;
if (pendingSingleUpdates == 0)
- mux.notifyAll();
+ notifyAll();
}
}
}
@@ -1243,15 +1240,13 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
/**
*
*/
- private void awaitSingleMapUpdates() {
- synchronized (mux) {
- try {
- while (pendingSingleUpdates > 0)
- U.wait(mux);
- }
- catch (IgniteInterruptedCheckedException e) {
- U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
- }
+ private synchronized void awaitSingleMapUpdates() {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(this);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
}
}
@@ -1316,7 +1311,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
else {
List<ClusterNode> nodes;
- synchronized (mux) {
+ synchronized (this) {
srvNodes.remove(cctx.localNode());
nodes = new ArrayList<>(srvNodes);
@@ -1423,7 +1418,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
assert msg.exchangeId().equals(exchId) : msg;
assert msg.lastVersion() != null : msg;
- synchronized (mux) {
+ synchronized (this) {
if (crd == null)
return;
@@ -1605,7 +1600,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
discoCache.updateAlives(node);
- synchronized (mux) {
+ synchronized (this) {
if (!srvNodes.remove(node))
return;
@@ -1755,7 +1750,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
Set<UUID> remaining;
List<ClusterNode> srvNodes;
- synchronized (mux) {
+ synchronized (this) {
remaining = new HashSet<>(this.remaining);
srvNodes = this.srvNodes != null ? new ArrayList<>(this.srvNodes) : null;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index 1948df0..8de01c9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
@@ -54,7 +55,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -80,7 +80,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
/**
* Cache lock future.
*/
-public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean>
+public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -209,7 +209,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
this.skipStore = skipStore;
this.keepBinary = keepBinary;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
@@ -499,13 +499,11 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
/**
* @return Keys for which locks requested from remote nodes but response isn't received.
*/
- public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
- if (timeoutObj != null && timeoutObj.requestedKeys != null)
- return timeoutObj.requestedKeys;
+ public synchronized Set<IgniteTxKey> requestedKeys() {
+ if (timeoutObj != null && timeoutObj.requestedKeys != null)
+ return timeoutObj.requestedKeys;
- return requestedKeys0();
- }
+ return requestedKeys0();
}
/**
@@ -537,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
@SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"})
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -1440,7 +1438,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean
timedOut = true;
if (inTx() && cctx.tm().deadlockDetectionEnabled()) {
- synchronized (sync) {
+ synchronized (GridNearLockFuture.this) {
requestedKeys = requestedKeys0();
clear(); // Stop response processing.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 80508dc..cbd9d23 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -227,7 +227,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (GridNearOptimisticSerializableTxPrepareFuture.this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 6189b38..81179ed 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -206,7 +206,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
public Set<IgniteTxKey> requestedKeys() {
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -239,7 +239,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -694,7 +694,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
if (keyLockFut != null)
keys = new HashSet<>(keyLockFut.lockKeys);
else {
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -765,9 +765,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa
*
*/
private static class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Receive result flag updater. */
private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD =
AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes");
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
index 4a443a9..cb15bca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java
@@ -132,7 +132,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
@SuppressWarnings("ForLoopReplaceableByForEach")
private MiniFuture miniFuture(int miniId) {
// We iterate directly over the futs collection here to avoid copy.
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
// Avoid iterator creation.
@@ -365,9 +365,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA
*/
private class MiniFuture extends GridFutureAdapter<GridNearTxPrepareResponse> {
/** */
- private static final long serialVersionUID = 0L;
-
- /** */
private final int futId;
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index 1b0566b..37be0fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundIdentityFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
@@ -46,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.C1;
@@ -66,7 +66,7 @@ import static org.apache.ignite.transactions.TransactionState.UNKNOWN;
/**
*
*/
-public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFuture<IgniteInternalTx>
+public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentityFuture<IgniteInternalTx>
implements GridCacheFuture<IgniteInternalTx> {
/** */
private static final long serialVersionUID = 0L;
@@ -114,7 +114,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
this.tx = tx;
this.commit = commit;
- ignoreInterrupts(true);
+ ignoreInterrupts();
mappings = tx.mappings();
@@ -189,7 +189,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (!isDone()) {
FinishMiniFuture finishFut = null;
- synchronized (sync) {
+ synchronized (this) {
int size = futuresCountNoLock();
for (int i = 0; i < size; i++) {
@@ -878,9 +878,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
*
*/
private class FinishMiniFuture extends MinFuture {
- /** */
- private static final long serialVersionUID = 0L;
-
/** Keys. */
@GridToStringInclude
private GridDistributedTxMapping m;
@@ -926,7 +923,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu
if (!F.isEmpty(backups)) {
final CheckRemoteTxMiniFuture mini;
- synchronized (sync) {
+ synchronized (GridNearTxFinishFuture.this) {
int futId = Integer.MIN_VALUE + futuresCountNoLock();
mini = new CheckRemoteTxMiniFuture(futId, new HashSet<>(backups));
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 8be87d4..5baec99 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -3946,7 +3946,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea
private <T> IgniteInternalFuture<T> nonInterruptable(IgniteInternalFuture<T> fut) {
// Safety.
if (fut instanceof GridFutureAdapter)
- ((GridFutureAdapter)fut).ignoreInterrupts(true);
+ ((GridFutureAdapter)fut).ignoreInterrupts();
return fut;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
index f9a6353..7f1f5a2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.processors.cache.GridCacheCompoundFuture;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
@@ -35,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
@@ -49,7 +49,7 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO
* Common code for tx prepare in optimistic and pessimistic modes.
*/
public abstract class GridNearTxPrepareFutureAdapter extends
- GridCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
+ GridCacheCompoundFuture<GridNearTxPrepareResponse, IgniteInternalTx> implements GridCacheMvccFuture<IgniteInternalTx> {
/** Logger reference. */
protected static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
index 8e224c8..d8e95b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -41,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.transactions.TransactionDeadlockException;
-import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -53,7 +53,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Cache lock future.
*/
-public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
+public final class GridLocalLockFuture<K, V> extends GridCacheFutureAdapter<Boolean>
implements GridCacheMvccFuture<Boolean> {
/** */
private static final long serialVersionUID = 0L;
@@ -135,7 +135,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean>
this.filter = filter;
this.tx = tx;
- ignoreInterrupts(true);
+ ignoreInterrupts();
threadId = tx == null ? Thread.currentThread().getId() : tx.threadId();
http://git-wip-us.apache.org/repos/asf/ignite/blob/e922dda6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 6110e0c..4c8e34f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -71,7 +71,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
assert mgr != null;
- synchronized (mux) {
+ synchronized (this) {
for (ClusterNode node : nodes)
subgrid.add(node.id());
}
@@ -87,7 +87,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
Collection<ClusterNode> allNodes = cctx.discovery().allNodes();
Collection<ClusterNode> nodes;
- synchronized (mux) {
+ synchronized (this) {
nodes = F.retain(allNodes, true,
new P1<ClusterNode>() {
@Override public boolean apply(ClusterNode node) {
@@ -139,7 +139,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
@Override protected void onNodeLeft(UUID nodeId) {
boolean callOnPage;
- synchronized (mux) {
+ synchronized (this) {
callOnPage = !loc && subgrid.contains(nodeId);
}
@@ -166,7 +166,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@Override protected boolean onPage(UUID nodeId, boolean last) {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
if (!loc) {
rcvd.add(nodeId);
@@ -192,11 +192,11 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void loadPage() {
- assert !Thread.holdsLock(mux);
+ assert !Thread.holdsLock(this);
Collection<ClusterNode> nodes = null;
- synchronized (mux) {
+ synchronized (this) {
if (!isDone() && rcvd.containsAll(subgrid)) {
rcvd.clear();
@@ -211,13 +211,13 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
@Override protected void loadAllPages() throws IgniteInterruptedCheckedException {
- assert !Thread.holdsLock(mux);
+ assert !Thread.holdsLock(this);
U.await(firstPageLatch);
Collection<ClusterNode> nodes = null;
- synchronized (mux) {
+ synchronized (this) {
if (!isDone() && !subgrid.isEmpty())
nodes = nodes();
}
@@ -230,7 +230,7 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
* @return Nodes to send requests to.
*/
private Collection<ClusterNode> nodes() {
- assert Thread.holdsLock(mux);
+ assert Thread.holdsLock(this);
Collection<ClusterNode> nodes = new ArrayList<>(subgrid.size());