You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/11/18 14:40:58 UTC
[42/50] [abbrv] ignite git commit: 'Single' operations optimizations
for tx cache.
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
new file mode 100644
index 0000000..22f04a8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
+ /** */
+ private IgniteTxEntry entry;
+
+ /** {@inheritDoc} */
+ @Override public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e) {
+ this.entry = e;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clearEntry(IgniteTxKey key) {
+ if (entry != null && entry.txKey().equals(key))
+ entry = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry entry(IgniteTxKey key) {
+ if (entry != null && entry.txKey().equals(key))
+ return entry;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWriteKey(IgniteTxKey key) {
+ return entry != null && entry.txKey().equals(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> readSet() {
+ return Collections.emptySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> writeSet() {
+ return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> writeEntries() {
+ return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> readEntries() {
+ return Collections.emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
+ return entry != null ? Collections.singletonMap(entry.txKey(), entry) :
+ Collections.<IgniteTxKey, IgniteTxEntry>emptyMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
+ return Collections.emptyMap();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return entry == null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> allEntries() {
+ return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteTxEntry singleWrite() {
+ return entry;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxRemoteSingleStateImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java
new file mode 100644
index 0000000..b8290a1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteState.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+/**
+ *
+ */
+public interface IgniteTxRemoteState extends IgniteTxState {
+ /**
+ * @param key Key.
+ * @param e Entry.
+ */
+ public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e);
+
+ /**
+ * @param key Entry key.
+ */
+ public void clearEntry(IgniteTxKey key);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
new file mode 100644
index 0000000..e7c4c96
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState {
+ /** {@inheritDoc} */
+ @Override public boolean implicitSingle() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Integer firstCacheId() {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitLastFut(GridCacheSharedContext cctx) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sync(GridCacheSharedContext cctx) {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
+ assert false;
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx)
+ throws IgniteCheckedException {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void topologyReadUnlock(GridCacheSharedContext cctx) {
+ assert false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
+ assert false;
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
+ assert false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
new file mode 100644
index 0000000..32bc646
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ *
+ */
+public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter {
+ /** Read set. */
+ @GridToStringInclude
+ protected Map<IgniteTxKey, IgniteTxEntry> readMap;
+
+ /** Write map. */
+ @GridToStringInclude
+ protected Map<IgniteTxKey, IgniteTxEntry> writeMap;
+
+ /**
+ * @param readMap Read map.
+ * @param writeMap Write map.
+ */
+ public IgniteTxRemoteStateImpl(Map<IgniteTxKey, IgniteTxEntry> readMap,
+ Map<IgniteTxKey, IgniteTxEntry> writeMap) {
+ this.readMap = readMap;
+ this.writeMap = writeMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry entry(IgniteTxKey key) {
+ IgniteTxEntry e = writeMap == null ? null : writeMap.get(key);
+
+ if (e == null)
+ e = readMap == null ? null : readMap.get(key);
+
+ return e;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWriteKey(IgniteTxKey key) {
+ return writeMap.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> readSet() {
+ return readMap.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> writeSet() {
+ return writeMap.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> writeEntries() {
+ return writeMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> readEntries() {
+ return readMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
+ return writeMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
+ return readMap;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return readMap.isEmpty() && writeMap.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e) {
+ writeMap.put(key, e);
+ }
+
+ /** {@inheritDoc} */
+ public void clearEntry(IgniteTxKey key) {
+ readMap.remove(key);
+ writeMap.remove(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> allEntries() {
+ return F.concat(false, writeEntries(), readEntries());
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry singleWrite() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxRemoteStateImpl.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
new file mode 100644
index 0000000..81707ba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface IgniteTxState {
+ /**
+ *
+ * @return Flag indicating whether transaction is implicit with only one key.
+ */
+ public boolean implicitSingle();
+
+ /**
+ * @return First tx cache id.
+ */
+ @Nullable public Integer firstCacheId();
+
+ /**
+ * @param cctx Context.
+ * @return cctx Non-null cache context if tx has only one active cache.
+ */
+ @Nullable public GridCacheContext singleCacheContext(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Awaits for previous async operations on active caches to be completed.
+ */
+ public void awaitLastFut(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
+ * @param topFut Topology future.
+ * @return Error if validation failed.
+ */
+ public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut);
+
+ /**
+ * @param cctx Context.
+ * @return {@code True} if transaction is fully synchronous.
+ */
+ public boolean sync(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
+ * @return {@code True} is tx has active near cache.
+ */
+ public boolean hasNearCache(GridCacheSharedContext cctx);
+
+ /**
+ * @param cacheCtx Ccntext.
+ * @param tx Transaction.
+ * @throws IgniteCheckedException If cache check failed.
+ */
+ public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) throws IgniteCheckedException;
+
+ /**
+ * @param cctx Context.
+ * @param fut Future to finish with error if some cache is stopping.
+ * @return Topology future.
+ */
+ public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut);
+
+ /**
+ * @param cctx Context.
+ */
+ public void topologyReadUnlock(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
+ * @return {@code True} if transaction is allowed to use store and transactions spans one or more caches with
+ * store enabled.
+ */
+ public boolean storeUsed(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
+ * @return Configured stores for active caches.
+ */
+ public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx);
+
+ /**
+ * @param cctx Context.
+ * @param tx Transaction.
+ * @param commit Commit flag.
+ */
+ public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit);
+
+ /**
+ * @param key Key.
+ * @return Entry.
+ */
+ @Nullable public IgniteTxEntry entry(IgniteTxKey key);
+
+ /**
+ * @param key Key.
+ * @return {@code True} if tx has write key.
+ */
+ public boolean hasWriteKey(IgniteTxKey key);
+
+ /**
+ * @return Read entries keys.
+ */
+ public Set<IgniteTxKey> readSet();
+
+ /**
+ * @return Write entries keys.
+ */
+ public Set<IgniteTxKey> writeSet();
+
+ /**
+ * @return Write entries.
+ */
+ public Collection<IgniteTxEntry> writeEntries();
+
+ /**
+ * @return Read entries.
+ */
+ public Collection<IgniteTxEntry> readEntries();
+
+ /**
+ * @return Write entries map.
+ */
+ public Map<IgniteTxKey, IgniteTxEntry> writeMap();
+
+ /**
+ * @return Read entries map.
+ */
+ public Map<IgniteTxKey, IgniteTxEntry> readMap();
+
+ /**
+ * @return All entries.
+ */
+ public Collection<IgniteTxEntry> allEntries();
+
+ /**
+ * @return Non-null entry if tx has only one write entry.
+ */
+ @Nullable public IgniteTxEntry singleWrite();
+
+ /**
+ * @return {@code True} if transaction is empty.
+ */
+ public boolean empty();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
new file mode 100644
index 0000000..c95fb19
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
+ /** Active cache IDs. */
+ private Set<Integer> activeCacheIds = new HashSet<>();
+ /** Per-transaction read map. */
+
+ @GridToStringInclude
+ protected Map<IgniteTxKey, IgniteTxEntry> txMap;
+
+ /** Read view on transaction map. */
+ @GridToStringExclude
+ protected IgniteTxMap readView;
+
+ /** Write view on transaction map. */
+ @GridToStringExclude
+ protected IgniteTxMap writeView;
+
+ /** {@inheritDoc} */
+ @Override public boolean implicitSingle() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Integer firstCacheId() {
+ return F.first(activeCacheIds);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
+ if (activeCacheIds.size() == 1) {
+ int cacheId = F.first(activeCacheIds);
+
+ return cctx.cacheContext(cacheId);
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitLastFut(GridCacheSharedContext cctx) {
+ for (Integer cacheId : activeCacheIds)
+ cctx.cacheContext(cacheId).cache().awaitLastFut();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx,
+ GridDhtTopologyFuture topFut) {
+ StringBuilder invalidCaches = null;
+
+ for (Integer cacheId : activeCacheIds) {
+ GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+ assert ctx != null : cacheId;
+
+ Throwable err = topFut.validateCache(ctx);
+
+ if (err != null) {
+ if (invalidCaches != null)
+ invalidCaches.append(", ");
+ else
+ invalidCaches = new StringBuilder();
+
+ invalidCaches.append(U.maskName(ctx.name()));
+ }
+ }
+
+ if (invalidCaches != null) {
+ return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
+ invalidCaches.toString());
+ }
+
+ for (int cacheId : activeCacheIds) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) {
+ return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " +
+ "partition nodes left the grid): " + cacheCtx.name());
+ }
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean sync(GridCacheSharedContext cctx) {
+ for (int cacheId : activeCacheIds) {
+ if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
+ for (Integer cacheId : activeCacheIds) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ if (cacheCtx.isNear())
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx)
+ throws IgniteCheckedException {
+ GridCacheSharedContext cctx = cacheCtx.shared();
+
+ int cacheId = cacheCtx.cacheId();
+
+ // Check if we can enlist new cache to transaction.
+ if (!activeCacheIds.contains(cacheId)) {
+ String err = cctx.verifyTxCompatibility(tx, activeCacheIds, cacheCtx);
+
+ if (err != null) {
+ StringBuilder cacheNames = new StringBuilder();
+
+ int idx = 0;
+
+ for (Integer activeCacheId : activeCacheIds) {
+ cacheNames.append(cctx.cacheContext(activeCacheId).name());
+
+ if (idx++ < activeCacheIds.size() - 1)
+ cacheNames.append(", ");
+ }
+
+ throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" +
+ err +
+ ") [activeCaches=[" + cacheNames + "]" +
+ ", cacheName=" + cacheCtx.name() +
+ ", cacheSystem=" + cacheCtx.systemTx() +
+ ", txSystem=" + tx.system() + ']');
+ }
+ else
+ activeCacheIds.add(cacheId);
+
+ if (activeCacheIds.size() == 1)
+ tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
+ if (activeCacheIds.isEmpty())
+ return cctx.exchange().lastTopologyFuture();
+
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : activeCacheIds) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx == null)
+ return cctx.exchange().lastTopologyFuture();
+
+ nonLocCtx.topology().readLock();
+
+ if (nonLocCtx.topology().stopping()) {
+ fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
+ nonLocCtx.name()));
+
+ return null;
+ }
+
+ return nonLocCtx.topology().topologyVersionFuture();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void topologyReadUnlock(GridCacheSharedContext cctx) {
+ if (!activeCacheIds.isEmpty()) {
+ GridCacheContext<?, ?> nonLocCtx = null;
+
+ for (int cacheId : activeCacheIds) {
+ GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+ if (!cacheCtx.isLocal()) {
+ nonLocCtx = cacheCtx;
+
+ break;
+ }
+ }
+
+ if (nonLocCtx != null)
+ nonLocCtx.topology().readUnlock();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+ if (!activeCacheIds.isEmpty()) {
+ for (int cacheId : activeCacheIds) {
+ CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+ if (store.configured())
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
+ Collection<Integer> cacheIds = activeCacheIds;
+
+ if (!cacheIds.isEmpty()) {
+ Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size());
+
+ for (int cacheId : cacheIds) {
+ CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+ if (store.configured())
+ stores.add(store);
+ }
+
+ return stores;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) {
+ for (int cacheId : activeCacheIds) {
+ GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+ onTxEnd(cacheCtx, tx, commit);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean init(int txSize) {
+ if (txMap == null) {
+ txMap = U.newLinkedHashMap(txSize > 0 ? txSize : 16);
+
+ readView = new IgniteTxMap(txMap, CU.reads());
+ writeView = new IgniteTxMap(txMap, CU.writes());
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean initialized() {
+ return txMap != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> allEntries() {
+ return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry entry(IgniteTxKey key) {
+ return txMap == null ? null : txMap.get(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasWriteKey(IgniteTxKey key) {
+ return writeView.containsKey(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> readSet() {
+ return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<IgniteTxKey> writeSet() {
+ return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> writeEntries() {
+ return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteTxEntry> readEntries() {
+ return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() {
+ return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() {
+ return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return txMap.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addEntry(IgniteTxEntry entry) {
+ txMap.put(entry.txKey(), entry);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void seal() {
+ if (readView != null)
+ readView.seal();
+
+ if (writeView != null)
+ writeView.seal();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTxEntry singleWrite() {
+ return writeView != null && writeView.size() == 1 ? F.firstValue(writeView) : null;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return S.toString(IgniteTxStateImpl.class, this);
+ }
+}