You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/08/01 10:37:13 UTC
[13/49] ignite git commit: IGNITE-5729 - IgniteCacheProxy instances
from with() methods are not reusable after cache restart
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
new file mode 100644
index 0000000..b94afa1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -0,0 +1,1810 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import javax.cache.configuration.CacheEntryListenerConfiguration;
+import javax.cache.configuration.Configuration;
+import javax.cache.expiry.ExpiryPolicy;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCacheRestartingException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cache.CacheManager;
+import org.apache.ignite.cache.CacheMetrics;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.Query;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.QueryDetailMetrics;
+import org.apache.ignite.cache.query.QueryMetrics;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cache.query.SpiQuery;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cache.query.TextQuery;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.AsyncSupportAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.query.CacheQuery;
+import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
+import org.apache.ignite.internal.processors.query.QueryUtils;
+import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
+import org.apache.ignite.internal.util.GridEmptyIterator;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
+import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridClosureException;
+import org.apache.ignite.internal.util.lang.IgniteOutClosureX;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.CX1;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.mxbean.CacheMetricsMXBean;
+import org.apache.ignite.plugin.security.SecurityPermission;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Cache proxy implementation.
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<K, V>>
+ implements IgniteCacheProxy<K, V> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Context. */
+ private volatile GridCacheContext<K, V> ctx;
+
+ /** Delegate. */
+ @GridToStringInclude
+ private volatile IgniteInternalCache<K, V> delegate;
+
+ /** */
+ @GridToStringExclude
+ private CacheManager cacheMgr;
+
+ /** Future indicates that cache is under restarting. */
+ private final AtomicReference<GridFutureAdapter<Void>> restartFut;
+
+ /** Flag indicates that proxy is closed. */
+ private volatile boolean closed;
+
+ /**
+ * Empty constructor required for {@link Externalizable}.
+ */
+ public IgniteCacheProxyImpl() {
+ restartFut = new AtomicReference<GridFutureAdapter<Void>>(null);
+ }
+
+ /**
+ * @param ctx Context.
+ * @param delegate Delegate.
+ * @param async Async support flag.
+ */
+ public IgniteCacheProxyImpl(
+ @NotNull GridCacheContext<K, V> ctx,
+ @NotNull IgniteInternalCache<K, V> delegate,
+ boolean async
+ ) {
+ this(ctx, delegate, new AtomicReference<GridFutureAdapter<Void>>(null), async);
+ }
+
+ /**
+ * @param ctx Context.
+ * @param delegate Delegate.
+ * @param async Async support flag.
+ */
+ private IgniteCacheProxyImpl(
+ @NotNull GridCacheContext<K, V> ctx,
+ @NotNull IgniteInternalCache<K, V> delegate,
+ @NotNull AtomicReference<GridFutureAdapter<Void>> restartFut,
+ boolean async
+ ) {
+ super(async);
+
+ assert ctx != null;
+ assert delegate != null;
+
+ this.ctx = ctx;
+ this.delegate = delegate;
+
+ this.restartFut = restartFut;
+ }
+
+ /**
+ * @return Context.
+ */
+ @Override
+ public GridCacheContext<K, V> context() {
+ return ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCacheProxy<K, V> cacheNoGate() {
+ return new GatewayProtectedCacheProxy<>(this, new CacheOperationContext(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheMetrics metrics() {
+ return ctx.cache().clusterMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheMetrics metrics(ClusterGroup grp) {
+ return ctx.cache().clusterMetrics(grp);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheMetrics localMetrics() {
+ return ctx.cache().localMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheMetricsMXBean mxBean() {
+ return ctx.cache().clusterMxBean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheMetricsMXBean localMxBean() {
+ return ctx.cache().localMxBean();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <C extends Configuration<K, V>> C getConfiguration(Class<C> clazz) {
+ CacheConfiguration cfg = ctx.config();
+
+ if (!clazz.isAssignableFrom(cfg.getClass()))
+ throw new IllegalArgumentException();
+
+ return clazz.cast(cfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withExpiryPolicy(ExpiryPolicy plc) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withSkipStore() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K1, V1> IgniteCache<K1, V1> withKeepBinary() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withNoRetries() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withPartitionRecover() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+ try {
+ if (isAsync()) {
+ if (ctx.cache().isLocal())
+ setFuture(ctx.cache().localLoadCacheAsync(p, args));
+ else
+ setFuture(ctx.cache().globalLoadCacheAsync(p, args));
+ }
+ else {
+ if (ctx.cache().isLocal())
+ ctx.cache().localLoadCache(p, args);
+ else
+ ctx.cache().globalLoadCache(p, args);
+ }
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> loadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
+ @Nullable Object... args) throws CacheException {
+ try {
+ if (ctx.cache().isLocal())
+ return (IgniteFuture<Void>)createFuture(ctx.cache().localLoadCacheAsync(p, args));
+ else
+ return (IgniteFuture<Void>)createFuture(ctx.cache().globalLoadCacheAsync(p, args));
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) {
+ try {
+ if (isAsync())
+ setFuture(delegate.localLoadCacheAsync(p, args));
+ else
+ delegate.localLoadCache(p, args);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> localLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
+ @Nullable Object... args) throws CacheException {
+ return (IgniteFuture<Void>)createFuture(delegate.localLoadCacheAsync(p, args));
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V getAndPutIfAbsent(K key, V val) throws CacheException {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAndPutIfAbsentAsync(key, val));
+
+ return null;
+ }
+ else
+ return delegate.getAndPutIfAbsent(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<V> getAndPutIfAbsentAsync(K key, V val) throws CacheException {
+ return createFuture(delegate.getAndPutIfAbsentAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Lock lock(K key) throws CacheException {
+ return lockAll(Collections.singleton(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Lock lockAll(final Collection<? extends K> keys) {
+ return new CacheLockImpl<>(ctx.gate(), delegate, new CacheOperationContext(), keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocalLocked(K key, boolean byCurrThread) {
+ return byCurrThread ? delegate.isLockedByThread(key) : delegate.isLocked(key);
+ }
+
+ /**
+ * @param scanQry ScanQry.
+ * @param transformer Transformer
+ * @param grp Optional cluster group.
+ * @return Cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private <T, R> QueryCursor<R> query(
+ final ScanQuery scanQry,
+ @Nullable final IgniteClosure<T, R> transformer,
+ @Nullable ClusterGroup grp)
+ throws IgniteCheckedException {
+
+ final CacheQuery<R> qry;
+
+ CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+ boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+ IgniteBiPredicate<K, V> p = scanQry.getFilter();
+
+ qry = ctx.queries().createScanQuery(p, transformer, scanQry.getPartition(), isKeepBinary);
+
+ if (scanQry.getPageSize() > 0)
+ qry.pageSize(scanQry.getPageSize());
+
+ if (grp != null)
+ qry.projection(grp);
+
+ final GridCloseableIterator<R> iter = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SCAN,
+ ctx.name(), ctx, new IgniteOutClosureX<GridCloseableIterator<R>>() {
+ @Override public GridCloseableIterator<R> applyx() throws IgniteCheckedException {
+ final GridCloseableIterator iter0 = qry.executeScanQuery();
+
+ final boolean needToConvert = transformer == null;
+
+ return new GridCloseableIteratorAdapter<R>() {
+ @Override protected R onNext() throws IgniteCheckedException {
+ Object next = iter0.nextX();
+
+ if (needToConvert) {
+ Map.Entry<K, V> entry = (Map.Entry<K, V>)next;
+
+ return (R)new CacheEntryImpl<>(entry.getKey(), entry.getValue());
+ }
+
+ return (R)next;
+ }
+
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ return iter0.hasNextX();
+ }
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ iter0.close();
+ }
+ };
+ }
+ }, true);
+
+ return new QueryCursorImpl<>(iter);
+ }
+
+ /**
+ * @param filter Filter.
+ * @param grp Optional cluster group.
+ * @return Cursor.
+ * @throws IgniteCheckedException If failed.
+ */
+ @SuppressWarnings("unchecked")
+ private QueryCursor<Cache.Entry<K, V>> query(final Query filter, @Nullable ClusterGroup grp)
+ throws IgniteCheckedException {
+ final CacheQuery qry;
+
+ CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+ boolean isKeepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+ final CacheQueryFuture fut;
+
+ if (filter instanceof TextQuery) {
+ TextQuery p = (TextQuery)filter;
+
+ qry = ctx.queries().createFullTextQuery(p.getType(), p.getText(), isKeepBinary);
+
+ if (grp != null)
+ qry.projection(grp);
+
+ fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.TEXT, p.getText(), ctx,
+ new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+ @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() {
+ return qry.execute();
+ }
+ }, false);
+ }
+ else if (filter instanceof SpiQuery) {
+ qry = ctx.queries().createSpiQuery(isKeepBinary);
+
+ if (grp != null)
+ qry.projection(grp);
+
+ fut = ctx.kernalContext().query().executeQuery(GridCacheQueryType.SPI, filter.getClass().getSimpleName(),
+ ctx, new IgniteOutClosureX<CacheQueryFuture<Map.Entry<K, V>>>() {
+ @Override public CacheQueryFuture<Map.Entry<K, V>> applyx() {
+ return qry.execute(((SpiQuery)filter).getArgs());
+ }
+ }, false);
+ }
+ else {
+ if (filter instanceof SqlFieldsQuery)
+ throw new CacheException("Use methods 'queryFields' and 'localQueryFields' for " +
+ SqlFieldsQuery.class.getSimpleName() + ".");
+
+ throw new CacheException("Unsupported query type: " + filter);
+ }
+
+ return new QueryCursorImpl<>(new GridCloseableIteratorAdapter<Entry<K, V>>() {
+ /** */
+ private Cache.Entry<K, V> cur;
+
+ @Override protected Entry<K, V> onNext() throws IgniteCheckedException {
+ if (!onHasNext())
+ throw new NoSuchElementException();
+
+ Cache.Entry<K, V> e = cur;
+
+ cur = null;
+
+ return e;
+ }
+
+ @Override protected boolean onHasNext() throws IgniteCheckedException {
+ if (cur != null)
+ return true;
+
+ Object next = fut.next();
+
+ // Workaround a bug: if IndexingSpi is configured future represents Iterator<Cache.Entry>
+ // instead of Iterator<Map.Entry> due to IndexingSpi interface.
+ if (next == null)
+ return false;
+
+ if (next instanceof Cache.Entry)
+ cur = (Cache.Entry)next;
+ else {
+ Map.Entry e = (Map.Entry)next;
+
+ cur = new CacheEntryImpl(e.getKey(), e.getValue());
+ }
+
+ return true;
+ }
+
+ @Override protected void onClose() throws IgniteCheckedException {
+ fut.cancel();
+ }
+ });
+ }
+
+ /**
+ * @param loc Enforce local.
+ * @return Local node cluster group.
+ */
+ private ClusterGroup projection(boolean loc) {
+ if (loc || ctx.isLocal() || ctx.isReplicatedAffinityNode())
+ return ctx.kernalContext().grid().cluster().forLocal();
+
+ if (ctx.isReplicated())
+ return ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).forRandom();
+
+ return null;
+ }
+
+ /**
+ * Executes continuous query.
+ *
+ * @param qry Query.
+ * @param loc Local flag.
+ * @param keepBinary Keep binary flag.
+ * @return Initial iteration cursor.
+ */
+ @SuppressWarnings("unchecked")
+ private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry, boolean loc, boolean keepBinary) {
+ if (qry.getInitialQuery() instanceof ContinuousQuery)
+ throw new IgniteException("Initial predicate for continuous query can't be an instance of another " +
+ "continuous query. Use SCAN or SQL query for initial iteration.");
+
+ if (qry.getLocalListener() == null)
+ throw new IgniteException("Mandatory local listener is not set for the query: " + qry);
+
+ if (qry.getRemoteFilter() != null && qry.getRemoteFilterFactory() != null)
+ throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory.");
+
+ try {
+ final UUID routineId = ctx.continuousQueries().executeQuery(
+ qry.getLocalListener(),
+ qry.getRemoteFilter(),
+ qry.getRemoteFilterFactory(),
+ qry.getPageSize(),
+ qry.getTimeInterval(),
+ qry.isAutoUnsubscribe(),
+ loc,
+ keepBinary,
+ qry.isIncludeExpired());
+
+ final QueryCursor<Cache.Entry<K, V>> cur =
+ qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
+
+ return new QueryCursor<Cache.Entry<K, V>>() {
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ return cur != null ? cur.iterator() : new GridEmptyIterator<Cache.Entry<K, V>>();
+ }
+
+ @Override public List<Cache.Entry<K, V>> getAll() {
+ return cur != null ? cur.getAll() : Collections.<Cache.Entry<K, V>>emptyList();
+ }
+
+ @Override public void close() {
+ if (cur != null)
+ cur.close();
+
+ try {
+ ctx.kernalContext().continuous().stopRoutine(routineId).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+ };
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public FieldsQueryCursor<List<?>> query(SqlFieldsQuery qry) {
+ return (FieldsQueryCursor<List<?>>)query((Query)qry);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <R> QueryCursor<R> query(Query<R> qry) {
+ A.notNull(qry, "qry");
+ try {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ validate(qry);
+
+ convertToBinary(qry);
+
+ CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
+ boolean keepBinary = opCtxCall != null && opCtxCall.isKeepBinary();
+
+ if (qry instanceof ContinuousQuery)
+ return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry, qry.isLocal(), keepBinary);
+
+ if (qry instanceof SqlQuery)
+ return (QueryCursor<R>)ctx.kernalContext().query().querySql(ctx, (SqlQuery)qry, keepBinary);
+
+ if (qry instanceof SqlFieldsQuery)
+ return (FieldsQueryCursor<R>)ctx.kernalContext().query().querySqlFields(ctx, (SqlFieldsQuery)qry,
+ keepBinary);
+
+ if (qry instanceof ScanQuery)
+ return query((ScanQuery)qry, null, projection(qry.isLocal()));
+
+ return (QueryCursor<R>)query(qry, projection(qry.isLocal()));
+ }
+ catch (Exception e) {
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
+ throw new CacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> QueryCursor<R> query(Query<T> qry, IgniteClosure<T, R> transformer) {
+ A.notNull(qry, "qry");
+ A.notNull(transformer, "transformer");
+
+ if (!(qry instanceof ScanQuery))
+ throw new UnsupportedOperationException("Transformers are supported only for SCAN queries.");
+
+ try {
+ ctx.checkSecurity(SecurityPermission.CACHE_READ);
+
+ validate(qry);
+
+ return query((ScanQuery<K, V>)qry, transformer, projection(qry.isLocal()));
+ }
+ catch (Exception e) {
+ if (e instanceof CacheException)
+ throw (CacheException)e;
+
+ throw new CacheException(e);
+ }
+ }
+
+ /**
+ * Convert query arguments to BinaryObjects if binary marshaller used.
+ *
+ * @param qry Query.
+ */
+ private void convertToBinary(final Query qry) {
+ if (ctx.binaryMarshaller()) {
+ if (qry instanceof SqlQuery) {
+ final SqlQuery sqlQry = (SqlQuery) qry;
+
+ convertToBinary(sqlQry.getArgs());
+ }
+ else if (qry instanceof SpiQuery) {
+ final SpiQuery spiQry = (SpiQuery) qry;
+
+ convertToBinary(spiQry.getArgs());
+ }
+ else if (qry instanceof SqlFieldsQuery) {
+ final SqlFieldsQuery fieldsQry = (SqlFieldsQuery) qry;
+
+ convertToBinary(fieldsQry.getArgs());
+ }
+ }
+ }
+
+ /**
+ * Converts query arguments to BinaryObjects if binary marshaller used.
+ *
+ * @param args Arguments.
+ */
+ private void convertToBinary(final Object[] args) {
+ if (args == null)
+ return;
+
+ for (int i = 0; i < args.length; i++)
+ args[i] = ctx.cacheObjects().binary().toBinary(args[i]);
+ }
+
+ /**
+ * Checks query.
+ *
+ * @param qry Query
+ * @throws CacheException If query indexing disabled for sql query.
+ */
+ private void validate(Query qry) {
+ if (!QueryUtils.isEnabled(ctx.config()) && !(qry instanceof ScanQuery) &&
+ !(qry instanceof ContinuousQuery) && !(qry instanceof SpiQuery))
+ throw new CacheException("Indexing is disabled for cache: " + ctx.cache().name() +
+ ". Use setIndexedTypes or setTypeMetadata methods on CacheConfiguration to enable.");
+
+ if (!ctx.kernalContext().query().moduleEnabled() &&
+ (qry instanceof SqlQuery || qry instanceof SqlFieldsQuery || qry instanceof TextQuery))
+ throw new CacheException("Failed to execute query. Add module 'ignite-indexing' to the classpath " +
+ "of all Ignite nodes.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterable<Cache.Entry<K, V>> localEntries(CachePeekMode... peekModes) throws CacheException {
+ try {
+ return delegate.localEntries(peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public QueryMetrics queryMetrics() {
+ return delegate.context().queries().metrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetQueryMetrics() {
+ delegate.context().queries().resetMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<? extends QueryDetailMetrics> queryDetailMetrics() {
+ return delegate.context().queries().detailMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetQueryDetailMetrics() {
+ delegate.context().queries().resetDetailMetrics();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void localEvict(Collection<? extends K> keys) {
+ delegate.evictAll(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V localPeek(K key, CachePeekMode... peekModes) {
+ try {
+ return delegate.localPeek(key, peekModes, null);
+ }
+ catch (IgniteException | IgniteCheckedException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size(CachePeekMode... peekModes) throws CacheException {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.sizeAsync(peekModes));
+
+ return 0;
+ }
+ else
+ return delegate.size(peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Integer> sizeAsync(CachePeekMode... peekModes) throws CacheException {
+ return createFuture(delegate.sizeAsync(peekModes));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long sizeLong(CachePeekMode... peekModes) throws CacheException {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.sizeLongAsync(peekModes));
+
+ return 0;
+ }
+ else
+ return delegate.sizeLong(peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Long> sizeLongAsync(CachePeekMode... peekModes) throws CacheException {
+ return createFuture(delegate.sizeLongAsync(peekModes));
+ }
+
+ /** {@inheritDoc} */
+ @Override public long sizeLong(int part, CachePeekMode... peekModes) throws CacheException {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.sizeLongAsync(part, peekModes));
+
+ return 0;
+ }
+ else
+ return delegate.sizeLong(part, peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Long> sizeLongAsync(int part, CachePeekMode... peekModes) throws CacheException {
+ return createFuture(delegate.sizeLongAsync(part, peekModes));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int localSize(CachePeekMode... peekModes) {
+ try {
+ return delegate.localSize(peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long localSizeLong(CachePeekMode... peekModes) {
+ try {
+ return delegate.localSizeLong(peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long localSizeLong(int part, CachePeekMode... peekModes) {
+ try {
+ return delegate.localSizeLong(part, peekModes);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public V get(K key) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAsync(key));
+
+ return null;
+ }
+ else
+ return delegate.get(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<V> getAsync(K key) {
+ return createFuture(delegate.getAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheEntry<K, V> getEntry(K key) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntryAsync(key));
+
+ return null;
+ }
+ else
+ return delegate.getEntry(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<CacheEntry<K, V>> getEntryAsync(K key) {
+ return createFuture(delegate.getEntryAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> getAll(Set<? extends K> keys) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAllAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getAll(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Map<K, V>> getAllAsync(Set<? extends K> keys) {
+ return createFuture(delegate.getAllAsync(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getEntriesAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getEntries(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Collection<CacheEntry<K, V>>> getEntriesAsync(Set<? extends K> keys) {
+ return createFuture(delegate.getEntriesAsync(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAllOutTxAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getAllOutTx(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) {
+ return createFuture(delegate.getAllOutTxAsync(keys));
+ }
+
+ /**
+ * @param keys Keys.
+ * @return Values map.
+ */
+ public Map<K, V> getAll(Collection<? extends K> keys) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAllAsync(keys));
+
+ return null;
+ }
+ else
+ return delegate.getAll(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKey(K key) {
+ if (isAsync()) {
+ setFuture(delegate.containsKeyAsync(key));
+
+ return false;
+ }
+ else
+ return delegate.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+ return createFuture(delegate.containsKeyAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKeys(Set<? extends K> keys) {
+ if (isAsync()) {
+ setFuture(delegate.containsKeysAsync(keys));
+
+ return false;
+ }
+ else
+ return delegate.containsKeys(keys);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> containsKeysAsync(Set<? extends K> keys) {
+ return createFuture(delegate.containsKeysAsync(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadAll(
+ Set<? extends K> keys,
+ boolean replaceExisting,
+ @Nullable final CompletionListener completionLsnr
+ ) {
+ IgniteInternalFuture<?> fut = ctx.cache().loadAll(keys, replaceExisting);
+
+ if (completionLsnr != null) {
+ fut.listen(new CI1<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+
+ completionLsnr.onCompletion();
+ }
+ catch (IgniteCheckedException e) {
+ completionLsnr.onException(cacheException(e));
+ }
+ }
+ });
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(K key, V val) {
+ try {
+ if (isAsync())
+ setFuture(putAsync0(key, val));
+ else
+ delegate.put(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> putAsync(K key, V val) {
+ return createFuture(putAsync0(key, val));
+ }
+
+ /**
+ * Put async internal operation implementation.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @return Internal future.
+ */
+ private IgniteInternalFuture<Void> putAsync0(K key, V val) {
+ IgniteInternalFuture<Boolean> fut = delegate.putAsync(key, val);
+
+ return fut.chain(new CX1<IgniteInternalFuture<Boolean>, Void>() {
+ @Override public Void applyx(IgniteInternalFuture<Boolean> fut1) throws IgniteCheckedException {
+ try {
+ fut1.get();
+ }
+ catch (RuntimeException e) {
+ throw new GridClosureException(e);
+ }
+
+ return null;
+ }
+ });
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndPut(K key, V val) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAndPutAsync(key, val));
+
+ return null;
+ }
+ else
+ return delegate.getAndPut(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<V> getAndPutAsync(K key, V val) {
+ return createFuture(delegate.getAndPutAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<? extends K, ? extends V> map) {
+ try {
+ if (isAsync())
+ setFuture(delegate.putAllAsync(map));
+ else
+ delegate.putAll(map);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> putAllAsync(Map<? extends K, ? extends V> map) {
+ return (IgniteFuture<Void>)createFuture(delegate.putAllAsync(map));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean putIfAbsent(K key, V val) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.putIfAbsentAsync(key, val));
+
+ return false;
+ }
+ else
+ return delegate.putIfAbsent(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) {
+ return createFuture(delegate.putIfAbsentAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(K key) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.removeAsync(key));
+
+ return false;
+ }
+ else
+ return delegate.remove(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> removeAsync(K key) {
+ return createFuture(delegate.removeAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(K key, V oldVal) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.removeAsync(key, oldVal));
+
+ return false;
+ }
+ else
+ return delegate.remove(key, oldVal);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> removeAsync(K key, V oldVal) {
+ return createFuture(delegate.removeAsync(key, oldVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndRemove(K key) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAndRemoveAsync(key));
+
+ return null;
+ }
+ else
+ return delegate.getAndRemove(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<V> getAndRemoveAsync(K key) {
+ return createFuture(delegate.getAndRemoveAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(K key, V oldVal, V newVal) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.replaceAsync(key, oldVal, newVal));
+
+ return false;
+ }
+ else
+ return delegate.replace(key, oldVal, newVal);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+ return createFuture(delegate.replaceAsync(key, oldVal, newVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(K key, V val) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.replaceAsync(key, val));
+
+ return false;
+ }
+ else
+ return delegate.replace(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Boolean> replaceAsync(K key, V val) {
+ return createFuture(delegate.replaceAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public V getAndReplace(K key, V val) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.getAndReplaceAsync(key, val));
+
+ return null;
+ }
+ else
+ return delegate.getAndReplace(key, val);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<V> getAndReplaceAsync(K key, V val) {
+ return createFuture(delegate.getAndReplaceAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll(Set<? extends K> keys) {
+ try {
+ if (isAsync())
+ setFuture(delegate.removeAllAsync(keys));
+ else
+ delegate.removeAll(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> removeAllAsync(Set<? extends K> keys) {
+ return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void removeAll() {
+ try {
+ if (isAsync())
+ setFuture(delegate.removeAllAsync());
+ else
+ delegate.removeAll();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> removeAllAsync() {
+ return (IgniteFuture<Void>)createFuture(delegate.removeAllAsync());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear(K key) {
+ try {
+ if (isAsync())
+ setFuture(delegate.clearAsync(key));
+ else
+ delegate.clear(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> clearAsync(K key) {
+ return (IgniteFuture<Void>)createFuture(delegate.clearAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearAll(Set<? extends K> keys) {
+ try {
+ if (isAsync())
+ setFuture(delegate.clearAllAsync(keys));
+ else
+ delegate.clearAll(keys);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> clearAllAsync(Set<? extends K> keys) {
+ return (IgniteFuture<Void>)createFuture(delegate.clearAllAsync(keys));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ try {
+ if (isAsync())
+ setFuture(delegate.clearAsync());
+ else
+ delegate.clear();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<Void> clearAsync() {
+ return (IgniteFuture<Void>)createFuture(delegate.clearAsync());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void localClear(K key) {
+ delegate.clearLocally(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void localClearAll(Set<? extends K> keys) {
+ for (K key : keys)
+ delegate.clearLocally(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws EntryProcessorException {
+ try {
+ if (isAsync()) {
+ setFuture(invokeAsync0(key, entryProcessor, args));
+
+ return null;
+ }
+ else {
+ EntryProcessorResult<T> res = delegate.invoke(key, entryProcessor, args);
+
+ return res != null ? res.get() : null;
+ }
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<T> invokeAsync(K key, EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return createFuture(invokeAsync0(key, entryProcessor, args));
+ }
+
+ /**
+ * Invoke async operation internal implementation.
+ *
+ * @param key Key.
+ * @param entryProcessor Processor.
+ * @param args Arguments.
+ * @return Internal future.
+ */
+ private <T> IgniteInternalFuture<T> invokeAsync0(K key, EntryProcessor<K, V, T> entryProcessor, Object[] args) {
+ IgniteInternalFuture<EntryProcessorResult<T>> fut = delegate.invokeAsync(key, entryProcessor, args);
+
+ return fut.chain(new CX1<IgniteInternalFuture<EntryProcessorResult<T>>, T>() {
+ @Override public T applyx(IgniteInternalFuture<EntryProcessorResult<T>> fut1)
+ throws IgniteCheckedException {
+ try {
+ EntryProcessorResult<T> res = fut1.get();
+
+ return res != null ? res.get() : null;
+ }
+ catch (RuntimeException e) {
+ throw new GridClosureException(e);
+ }
+ }
+ });
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... args)
+ throws EntryProcessorException {
+ return invoke(key, (EntryProcessor<K, V, T>)entryProcessor, args);
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<T> invokeAsync(K key, CacheEntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ return invokeAsync(key, (EntryProcessor<K, V, T>)entryProcessor, args);
+ }
+
+ /**
+ * @param topVer Locked topology version.
+ * @param key Key.
+ * @param entryProcessor Entry processor.
+ * @param args Arguments.
+ * @return Invoke result.
+ */
+ public <T> T invoke(@Nullable AffinityTopologyVersion topVer,
+ K key,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ try {
+ if (isAsync())
+ throw new UnsupportedOperationException();
+ else {
+ EntryProcessorResult<T> res = delegate.invoke(topVer, key, entryProcessor, args);
+
+ return res != null ? res.get() : null;
+ }
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+
+ return null;
+ }
+ else
+ return delegate.invokeAll(keys, entryProcessor, args);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ EntryProcessor<K, V, T> entryProcessor, Object... args) {
+ return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+ CacheEntryProcessor<K, V, T> entryProcessor,
+ Object... args) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+
+ return null;
+ }
+ else
+ return delegate.invokeAll(keys, entryProcessor, args);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys,
+ CacheEntryProcessor<K, V, T> entryProcessor, Object... args) {
+ return createFuture(delegate.invokeAllAsync(keys, entryProcessor, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+ Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+ Object... args) {
+ try {
+ if (isAsync()) {
+ setFuture(delegate.invokeAllAsync(map, args));
+
+ return null;
+ }
+ else
+ return delegate.invokeAll(map, args);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+ Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args) {
+ return createFuture(delegate.invokeAllAsync(map, args));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getName() {
+ return delegate.name();
+ }
+
+ /** {@inheritDoc} */
+ @Override public CacheManager getCacheManager() {
+ return cacheMgr;
+ }
+
+ /**
+ * @param cacheMgr Cache manager.
+ */
+ public void setCacheManager(CacheManager cacheMgr) {
+ this.cacheMgr = cacheMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void destroy() {
+ destroyAsync().get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> destroyAsync() {
+ return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(ctx.name(), false, true, false));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ closeAsync().get();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> closeAsync() {
+ return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(ctx.name()));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClosed() {
+ return ctx.kernalContext().cache().context().closed(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <T> T unwrap(Class<T> clazz) {
+ if (clazz.isAssignableFrom(getClass()))
+ return (T)this;
+ else if (clazz.isAssignableFrom(IgniteEx.class))
+ return (T)ctx.grid();
+
+ throw new IllegalArgumentException("Unwrapping to class is not supported: " + clazz);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void registerCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+ try {
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null && opCtx.isKeepBinary());
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deregisterCacheEntryListener(CacheEntryListenerConfiguration<K, V> lsnrCfg) {
+ try {
+ ctx.continuousQueries().cancelJCacheQuery(lsnrCfg);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Cache.Entry<K, V>> iterator() {
+ try {
+ return ctx.cache().igniteIterator();
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteCache<K, V> createAsyncInstance() {
+ return new IgniteCacheProxyImpl<K, V>(
+ ctx,
+ delegate,
+ true
+ );
+ }
+
+ /**
+ * Creates projection that will operate with binary objects. <p> Projection returned by this method will force
+ * cache not to deserialize binary objects, so keys and values will be returned from cache API methods without
+ * changes. Therefore, signature of the projection can contain only following types: <ul> <li>{@code BinaryObject}
+ * for binary classes</li> <li>All primitives (byte, int, ...) and there boxed versions (Byte, Integer, ...)</li>
+ * <li>Arrays of primitives (byte[], int[], ...)</li> <li>{@link String} and array of {@link String}s</li>
+ * <li>{@link UUID} and array of {@link UUID}s</li> <li>{@link Date} and array of {@link Date}s</li> <li>{@link
+ * java.sql.Timestamp} and array of {@link java.sql.Timestamp}s</li> <li>Enums and array of enums</li> <li> Maps,
+ * collections and array of objects (but objects inside them will still be converted if they are binary) </li>
+ * </ul> <p> For example, if you use {@link Integer} as a key and {@code Value} class as a value (which will be
+ * stored in binary format), you should acquire following projection to avoid deserialization:
+ * <pre>
+ * IgniteInternalCache<Integer, GridBinaryObject> prj = cache.keepBinary();
+ *
+ * // Value is not deserialized and returned in binary format.
+ * GridBinaryObject po = prj.get(1);
+ * </pre>
+ * <p> Note that this method makes sense only if cache is working in binary mode ({@code
+ * CacheConfiguration#isBinaryEnabled()} returns {@code true}. If not, this method is no-op and will return
+ * current projection.
+ *
+ * @return Projection for binary objects.
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public <K1, V1> IgniteCache<K1, V1> keepBinary() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @param dataCenterId Data center ID.
+ * @return Projection for data center id.
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public IgniteCache<K, V> withDataCenterId(byte dataCenterId) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @return Cache with skip store enabled.
+ */
+ @Override
+ public IgniteCache<K, V> skipStore() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Method converts exception to IgniteCacheRestartingException in case of cache restarting
+ * or to CacheException in other cases.
+ *
+ * @param e {@code IgniteCheckedException} or {@code IgniteException}.
+ * @return Cache exception.
+ */
+ private RuntimeException cacheException(Exception e) {
+ GridFutureAdapter<Void> restartFut = this.restartFut.get();
+
+ if (restartFut != null && !restartFut.isDone()) {
+ if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class))
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " +
+ ctx.name());
+ }
+
+ if (e instanceof IgniteCheckedException)
+ return CU.convertToCacheException((IgniteCheckedException) e);
+
+ if (e instanceof RuntimeException)
+ return (RuntimeException) e;
+
+ throw new IllegalStateException("Unknown exception", e);
+ }
+
+ /**
+ * @param fut Future for async operation.
+ */
+ private <R> void setFuture(IgniteInternalFuture<R> fut) {
+ curFut.set(createFuture(fut));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
+ return new IgniteCacheFutureImpl<>(fut);
+ }
+
+ /**
+ * @return Internal proxy.
+ */
+ @Override
+ public GridCacheProxyImpl<K, V> internalProxy() {
+ return new GridCacheProxyImpl<>(ctx, delegate, ctx.operationContextPerCall());
+ }
+
+ /**
+ * @return {@code True} if proxy was closed.
+ */
+ @Override public boolean isProxyClosed() {
+ return closed;
+ }
+
+ /**
+ * Closes this proxy instance.
+ */
+ @Override public void closeProxy() {
+ closed = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Integer> lostPartitions() {
+ return delegate.lostPartitions();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx);
+
+ out.writeObject(delegate);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked"})
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ ctx = (GridCacheContext<K, V>)in.readObject();
+
+ delegate = (IgniteInternalCache<K, V>)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> rebalance() {
+ return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFuture<?> indexReadyFuture() {
+ IgniteInternalFuture fut = ctx.shared().database().indexRebuildFuture(ctx.cacheId());
+
+ if (fut == null)
+ return new IgniteFinishedFutureImpl<>();
+
+ return new IgniteFutureImpl<>(fut);
+ }
+
+ /**
+ * Gets value without waiting for toplogy changes.
+ *
+ * @param key Key.
+ * @return Value.
+ */
+ @Override
+ public V getTopologySafe(K key) {
+ try {
+ return delegate.getTopologySafe(key);
+ }
+ catch (IgniteCheckedException | IgniteException e) {
+ throw cacheException(e);
+ }
+ }
+
+ /**
+ * Throws {@code IgniteCacheRestartingException} if proxy is restarting.
+ */
+ public void checkRestart() {
+ if (isRestarting())
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut.get()), "Cache is restarting: " +
+ context().name());
+ }
+
+ /**
+ * @return True if proxy is restarting, false in other case.
+ */
+ public boolean isRestarting() {
+ return restartFut != null && restartFut.get() != null;
+ }
+
+ /**
+ * Restarts this cache proxy.
+ */
+ public void restart() {
+ GridFutureAdapter<Void> restartFut = new GridFutureAdapter<>();
+
+ final GridFutureAdapter<Void> currentFut = this.restartFut.get();
+
+ boolean changed = this.restartFut.compareAndSet(currentFut, restartFut);
+
+ if (changed && currentFut != null)
+ restartFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
+ @Override public void apply(IgniteInternalFuture<Void> future) {
+ if (future.error() != null)
+ currentFut.onDone(future.error());
+ else
+ currentFut.onDone();
+ }
+ });
+ }
+
+ /**
+ * Mark this proxy as restarted.
+ *
+ * @param ctx New cache context.
+ * @param delegate New delegate.
+ */
+ public void onRestarted(GridCacheContext ctx, IgniteInternalCache delegate) {
+ GridFutureAdapter<Void> restartFut = this.restartFut.get();
+
+ assert restartFut != null;
+
+ this.ctx = ctx;
+ this.delegate = delegate;
+
+ restartFut.onDone();
+
+ this.restartFut.compareAndSet(restartFut, null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteCacheProxyImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
index 0cec1fe..61ab122 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/dr/IgniteDrDataStreamerCacheUpdater.java
@@ -62,7 +62,7 @@ public class IgniteDrDataStreamerCacheUpdater implements StreamReceiver<KeyCache
IgniteLogger log = ctx.log(IgniteDrDataStreamerCacheUpdater.class);
GridCacheAdapter internalCache = ctx.cache().internalCache(cacheName);
- CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).operationContext();
+ CacheOperationContext opCtx = ((IgniteCacheProxy)cache0).context().operationContextPerCall();
IgniteInternalCache cache =
opCtx != null ? new GridCacheProxyImpl(internalCache.context(), internalCache, opCtx) : internalCache;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
index 6207995..ef914a9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java
@@ -17,6 +17,18 @@
package org.apache.ignite.internal.processors.platform.cache;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import javax.cache.Cache;
+import javax.cache.integration.CompletionListener;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -41,8 +53,8 @@ import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
-import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryProxy;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor;
@@ -63,19 +75,6 @@ import org.apache.ignite.transactions.TransactionDeadlockException;
import org.apache.ignite.transactions.TransactionTimeoutException;
import org.jetbrains.annotations.Nullable;
-import javax.cache.Cache;
-import javax.cache.integration.CompletionListener;
-import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-
/**
* Native cache wrapper implementation.
*/
@@ -1009,7 +1008,7 @@ public class PlatformCache extends PlatformAbstractTarget {
}
case OP_WITH_NO_RETRIES: {
- CacheOperationContext opCtx = cache.operationContext();
+ CacheOperationContext opCtx = cache.context().operationContextPerCall();
if (opCtx != null && opCtx.noRetries())
return this;
@@ -1018,7 +1017,9 @@ public class PlatformCache extends PlatformAbstractTarget {
}
case OP_WITH_SKIP_STORE: {
- if (cache.delegate().skipStore())
+ CacheOperationContext opCtx = cache.context().operationContextPerCall();
+
+ if (opCtx != null && opCtx.skipStore())
return this;
return copy(rawCache.withSkipStore(), keepBinary);
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
index 7affa8c..7005e14 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheEntryProcessorCopySelfTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -149,7 +150,15 @@ public class CacheEntryProcessorCopySelfTest extends GridCommonAbstractTest {
}
});
- CacheObject obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(0).peekVisibleValue();
+ GridCacheAdapter ca = (GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate();
+
+ GridCacheEntryEx entry = ca.entryEx(0);
+
+ entry.unswap();
+
+ CacheObject obj = entry.peekVisibleValue();
+
+ ca.context().evicts().touch(entry, AffinityTopologyVersion.NONE);
int actCnt = cnt.get();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
index f67e247..c53bc4b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStopAndDestroySelfTest.java
@@ -768,6 +768,7 @@ public class CacheStopAndDestroySelfTest extends GridCommonAbstractTest {
cache.close();
+ // Check second close succeeds without exception.
cache.close();
try {
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
index 822537c..9376971 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java
@@ -34,6 +34,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -246,6 +247,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest {
}
}
+ cfg.setAffinity(new RendezvousAffinityFunction(false, 4096));
cfg.setCacheMode(cacheMode());
cfg.setAtomicityMode(atomicityMode());
cfg.setWriteSynchronizationMode(writeSynchronization());
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
index 80404ce..2a90bf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheOnCopyFlagAbstractSelfTest.java
@@ -178,7 +178,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
cache.put(key, val);
CacheObject obj =
- ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+ ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
// Check thar internal entry wasn't changed.
assertEquals(i, getValue(obj, cache));
@@ -211,7 +211,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
cache.put(key, newTestVal);
- obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+ obj = ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
// Check thar internal entry wasn't changed.
assertEquals(-i, getValue(obj, cache));
@@ -290,7 +290,7 @@ public abstract class GridCacheOnCopyFlagAbstractSelfTest extends GridCommonAbst
});
CacheObject obj =
- ((GridCacheAdapter)((IgniteCacheProxy)cache).delegate()).peekEx(key).peekVisibleValue();
+ ((GridCacheAdapter)((IgniteCacheProxy)cache).internalProxy().delegate()).peekEx(key).peekVisibleValue();
assertNotEquals(WRONG_VALUE, getValue(obj, cache));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
index 3c5fe0e..e068252 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheValueConsistencyAbstractSelfTest.java
@@ -124,7 +124,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
info("Node is reported as NOT affinity node for key [key=" + key +
", nodeId=" + locNode.id() + ']');
- if (nearEnabled() && cache == cache0)
+ if (nearEnabled() && cache.equals(cache0))
assertEquals((Integer)i, cache0.localPeek(key));
else
assertNull(cache0.localPeek(key));
@@ -184,7 +184,7 @@ public abstract class GridCacheValueConsistencyAbstractSelfTest extends GridCach
info("Node is reported as NOT affinity node for key [key=" + key +
", nodeId=" + locNode.id() + ']');
- if (nearEnabled() && cache == cache0)
+ if (nearEnabled() && cache.equals(cache0))
assertEquals((Integer)i, cache0.localPeek(key));
else
assertNull(cache0.localPeek(key));
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
index dcba92f..8d5462d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheConfigVariationsFullApiTest.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -49,12 +47,13 @@ import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.EntryProcessorResult;
import javax.cache.processor.MutableEntry;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import junit.framework.AssertionFailedError;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheEntry;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
@@ -83,7 +82,6 @@ import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.IgniteCacheConfigVariationsAbstractTest;
import org.apache.ignite.transactions.Transaction;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
index bcf46fd..e473d52 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java
@@ -65,7 +65,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.jetbrains.annotations.Nullable;
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
index d3269c3..b55e3d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheGroupsTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache;
-import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,6 +49,7 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
+import com.google.common.collect.Sets;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -78,8 +78,8 @@ import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicyFactory;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -87,6 +87,7 @@ import org.apache.ignite.internal.util.lang.GridPlainCallable;
import org.apache.ignite.internal.util.lang.gridfunc.ContainsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
@@ -2892,10 +2893,10 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
IgniteInternalFuture cacheFut = GridTestUtils.runAsync(new Runnable() {
@Override public void run() {
- try {
- int cntr = 0;
+ int cntr = 0;
- while (!stop.get()) {
+ while (!stop.get()) {
+ try {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
String grp;
@@ -2927,13 +2928,20 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
node.destroyCache(cache.getName());
}
- }
- catch (Exception e) {
- err.set(true);
+ catch (Exception e) {
+ if (X.hasCause(e, CacheStoppedException.class)) {
+ // Cache operation can be blocked on
+ // awaiting new topology version and cancelled with CacheStoppedException cause.
- log.error("Unexpected error(2): " + e, e);
+ continue;
+ }
- stop.set(true);
+ err.set(true);
+
+ log.error("Unexpected error(2): " + e, e);
+
+ stop.set(true);
+ }
}
}
}, "cache-destroy-thread");
@@ -3706,7 +3714,7 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
final AtomicReferenceArray<IgniteCache> caches = new AtomicReferenceArray<>(CACHES);
- for (int i = 0; i < 10; i++) {
+ for (int i = 0; i < CACHES; i++) {
CacheAtomicityMode atomicityMode = i % 2 == 0 ? ATOMIC : TRANSACTIONAL;
caches.set(i,
@@ -3799,28 +3807,41 @@ public class IgniteCacheGroupsTest extends GridCommonAbstractTest {
IgniteInternalFuture opFut = GridTestUtils.runMultiThreadedAsync(new Runnable() {
@Override public void run() {
- try {
- ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
- while (!stop.get()) {
+ while (!stop.get()) {
+ try {
int idx = rnd.nextInt(CACHES);
IgniteCache cache = caches.get(idx);
if (cache != null && caches.compareAndSet(idx, cache, null)) {
- for (int i = 0; i < 10; i++)
- cacheOperation(rnd, cache);
-
- caches.set(idx, cache);
+ try {
+ for (int i = 0; i < 10; i++)
+ cacheOperation(rnd, cache);
+ }
+ catch (Exception e) {
+ if (X.hasCause(e, CacheStoppedException.class)) {
+ // Cache operation can be blocked on
+ // awaiting new topology version and cancelled with CacheStoppedException cause.
+
+ continue;
+ }
+
+ throw e;
+ }
+ finally {
+ caches.set(idx, cache);
+ }
}
}
- }
- catch (Exception e) {
- err.set(e);
+ catch (Exception e) {
+ err.set(e);
- log.error("Unexpected error: " + e, e);
+ log.error("Unexpected error: " + e, e);
- stop.set(true);
+ stop.set(true);
+ }
}
}
}, 8, "op-thread");
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
index 7cb9861..25b90c4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheStartStopLoadTest.java
@@ -113,7 +113,6 @@ public class IgniteCacheStartStopLoadTest extends GridCommonAbstractTest {
cache.put(1, obj);
- weakMap.put(((IgniteCacheProxy)cache).delegate(), Boolean.TRUE);
weakMap.put(obj, Boolean.TRUE);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/5172541f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
index 9a49b6c..6493f88 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/MemoryPolicyInitializationTest.java
@@ -24,6 +24,7 @@ import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.MemoryPolicyConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -217,7 +218,7 @@ public class MemoryPolicyInitializationTest extends GridCommonAbstractTest {
* @param plcName Policy name.
*/
private void verifyCacheMemoryPolicy(IgniteCache cache, String plcName) {
- GridCacheContext ctx = U.field(cache, "ctx");
+ GridCacheContext ctx = ((IgniteCacheProxy) cache).context();
assertEquals(plcName, ctx.memoryPolicy().config().getName());
}