You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/08 13:16:57 UTC
[12/28] ignite git commit: ignite-comm-balance
ignite-comm-balance
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2b561f77
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2b561f77
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2b561f77
Branch: refs/heads/ignite-4371
Commit: 2b561f7754c41ac7c972224b059e94c553cea427
Parents: d6a9767
Author: sboikov <sb...@gridgain.com>
Authored: Wed Dec 7 12:30:29 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Dec 7 14:28:34 2016 +0300
----------------------------------------------------------------------
.../ignite/internal/IgniteInternalFuture.java | 11 +++++++
.../transactions/IgniteTxLocalAdapter.java | 8 +++---
.../processors/igfs/IgfsDataManager.java | 6 +++-
.../platform/compute/PlatformCompute.java | 6 ++++
.../util/future/GridFinishedFuture.java | 24 ++++++++++++++++
.../internal/util/future/GridFutureAdapter.java | 15 ++++++++--
.../util/future/GridFutureChainListener.java | 30 ++++++++++++++++++--
.../TxDeadlockDetectionNoHangsTest.java | 2 +-
8 files changed, 90 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index b80a755..789556d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
@@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> {
public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb);
/**
+ * Make a chained future to convert result of this future (when complete) into a new format.
+ * It is guaranteed that done callback will be called only ONCE.
+ *
+ * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
+ * @param exec Executor to run callback.
+ * @return Chained future that finishes after this future completes and done callback is called.
+ */
+ public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec);
+
+ /**
* @return Error value if future has already been completed with error.
*/
public Throwable error();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 6d21dcf..393fb1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -391,7 +391,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> loadMissing(
final GridCacheContext cacheCtx,
- AffinityTopologyVersion topVer,
+ final AffinityTopologyVersion topVer,
final boolean readThrough,
boolean async,
final Collection<KeyCacheObject> keys,
@@ -472,7 +472,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
CacheObject cacheVal = cacheCtx.toCacheObject(val);
while (true) {
- GridCacheEntryEx entry = cacheCtx.cache().entryEx(key);
+ GridCacheEntryEx entry = cacheCtx.cache().entryEx(key, topVer);
try {
GridCacheVersion setVer = entry.versionedValue(cacheVal, ver, null);
@@ -1507,7 +1507,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
assert txEntry != null || readCommitted() || skipVals;
- GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey) : txEntry.cached();
+ GridCacheEntryEx e = txEntry == null ? entryEx(cacheCtx, txKey, topVer) : txEntry.cached();
if (readCommitted() || skipVals) {
cacheCtx.evicts().touch(e, topologyVersion());
@@ -1658,7 +1658,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig
IgniteTxLocalAdapter.this,
/*swap*/cacheCtx.isSwapOrOffheapEnabled(),
/*unmarshal*/true,
- /**update-metrics*/true,
+ /*update-metrics*/true,
/*event*/!skipVals,
CU.subjectId(IgniteTxLocalAdapter.this, cctx),
transformClo,
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
index e534800..4490a68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsDataManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.igfs;
+import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteException;
@@ -36,6 +37,7 @@ import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystemPositionedReadabl
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
@@ -325,6 +327,8 @@ public class IgfsDataManager extends IgfsManager {
IgniteInternalFuture<byte[]> fut = dataCachePrj.getAsync(key);
if (secReader != null) {
+ Executor exec = igfsCtx.kernalContext().pools().poolForPolicy(GridIoPolicy.IGFS_POOL);
+
fut = fut.chain(new CX1<IgniteInternalFuture<byte[]>, byte[]>() {
@Override public byte[] applyx(IgniteInternalFuture<byte[]> fut) throws IgniteCheckedException {
byte[] res = fut.get();
@@ -365,7 +369,7 @@ public class IgfsDataManager extends IgfsManager {
return res;
}
- });
+ }, exec);
}
else
igfsCtx.metrics().addReadBlocks(1, 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
index 8ff15d5..5383151 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/compute/PlatformCompute.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.platform.compute;
+import java.util.concurrent.Executor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.binary.BinaryObject;
@@ -409,6 +410,11 @@ public class PlatformCompute extends PlatformAbstractTarget {
}
/** {@inheritDoc} */
+ @Override public IgniteInternalFuture chain(IgniteClosure doneCb, Executor exec) {
+ throw new UnsupportedOperationException("Chain operation is not supported.");
+ }
+
+ /** {@inheritDoc} */
@Override public Throwable error() {
return fut.error();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
index 6baedbd..dc63adc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFinishedFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.future;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -152,6 +153,29 @@ public class GridFinishedFuture<T> implements IgniteInternalFuture<T> {
}
/** {@inheritDoc} */
+ @Override public <T1> IgniteInternalFuture<T1> chain(final IgniteClosure<? super IgniteInternalFuture<T>, T1> doneCb, Executor exec) {
+ final GridFutureAdapter<T1> fut = new GridFutureAdapter<>();
+
+ exec.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ fut.onDone(doneCb.apply(GridFinishedFuture.this));
+ }
+ catch (GridClosureException e) {
+ fut.onDone(e.unwrap());
+ }
+ catch (RuntimeException | Error e) {
+ fut.onDone(e);
+
+ throw e;
+ }
+ }
+ });
+
+ return fut;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridFinishedFuture.class, this);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index 2cd534e..c8d85cd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.util.future;
import java.util.Arrays;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.apache.ignite.IgniteCheckedException;
@@ -229,7 +230,13 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/** {@inheritDoc} */
@Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb) {
- return new ChainFuture<>(this, doneCb);
+ return new ChainFuture<>(this, doneCb, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteInternalFuture<T> chain(final IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+ Executor exec) {
+ return new ChainFuture<>(this, doneCb, exec);
}
/**
@@ -487,15 +494,17 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements
/**
* @param fut Future.
* @param doneCb Closure.
+ * @param cbExec Optional executor to run callback.
*/
ChainFuture(
GridFutureAdapter<R> fut,
- IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb
+ IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb,
+ @Nullable Executor cbExec
) {
this.fut = fut;
this.doneCb = doneCb;
- fut.listen(new GridFutureChainListener<>(this, doneCb));
+ fut.listen(new GridFutureChainListener<>(this, doneCb, cbExec));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
index 947b2ad..15ef555 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureChainListener.java
@@ -17,15 +17,17 @@
package org.apache.ignite.internal.util.future;
+import java.util.concurrent.Executor;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteInClosure;
+import org.jetbrains.annotations.Nullable;
/**
* Future listener to fill chained future with converted result of the source future.
*/
-public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
+class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInternalFuture<T>> {
/** */
private static final long serialVersionUID = 0L;
@@ -35,21 +37,43 @@ public class GridFutureChainListener<T, R> implements IgniteInClosure<IgniteInte
/** Done callback. */
private final IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb;
+ /** */
+ private Executor cbExec;
+
/**
* Constructs chain listener.
+ *
* @param fut Target future.
* @param doneCb Done callback.
+ * @param cbExec Optional executor to run callback.
*/
public GridFutureChainListener(
GridFutureAdapter<R> fut,
- IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb
+ IgniteClosure<? super IgniteInternalFuture<T>, R> doneCb,
+ @Nullable Executor cbExec
) {
this.fut = fut;
this.doneCb = doneCb;
+ this.cbExec = cbExec;
}
/** {@inheritDoc} */
- @Override public void apply(IgniteInternalFuture<T> t) {
+ @Override public void apply(final IgniteInternalFuture<T> t) {
+ if (cbExec != null) {
+ cbExec.execute(new Runnable() {
+ @Override public void run() {
+ apply(t);
+ }
+ });
+ }
+ else
+ applyCallback(t);
+ }
+
+ /**
+ * @param t Target future.
+ */
+ private void applyCallback(IgniteInternalFuture<T> t) {
try {
fut.onDone(doneCb.apply(t));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2b561f77/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
index c9d18eb..e9d74ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetectionNoHangsTest.java
@@ -211,7 +211,7 @@ public class TxDeadlockDetectionNoHangsTest extends GridCommonAbstractTest {
tx.commit();
}
catch (Exception e) {
- e.printStackTrace();
+ log.info("Ignore error: " + e);
}
}
}, NODES_CNT * 3, "tx-thread");