You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:23 UTC
[43/50] [abbrv] ignite git commit: Merge remote-tracking branch
'remotes/community/ignite-1.7.4' into ignite-1.8.2
Merge remote-tracking branch 'remotes/community/ignite-1.7.4' into ignite-1.8.2
# Conflicts:
# modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
# modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
# modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
# modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
# modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
# modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
# modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
# modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
# modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64247b92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64247b92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64247b92
Branch: refs/heads/master
Commit: 64247b9228451e46abb8029e09c7fc6ed4e16d2d
Parents: 147277d 8dd4ada
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 19 15:54:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 19 15:54:39 2016 +0300
----------------------------------------------------------------------
.../store/cassandra/CassandraCacheStore.java | 9 +-
.../store/cassandra/datasource/DataSource.java | 9 +
.../rest/RestProcessorMultiStartSelfTest.java | 48 +-
.../java/org/apache/ignite/IgniteServices.java | 16 +
.../apache/ignite/IgniteSystemProperties.java | 6 +
.../rendezvous/RendezvousAffinityFunction.java | 80 ++-
.../ignite/cache/store/CacheStoreAdapter.java | 6 +
.../cache/store/jdbc/CacheJdbcPojoStore.java | 19 +-
.../store/jdbc/JdbcTypesDefaultTransformer.java | 112 ++--
.../apache/ignite/internal/IgniteKernal.java | 28 +-
.../ignite/internal/IgniteServicesImpl.java | 9 +-
.../internal/binary/BinaryClassDescriptor.java | 12 +-
.../ignite/internal/binary/BinaryUtils.java | 10 +-
.../binary/builder/BinaryObjectBuilderImpl.java | 11 +-
.../discovery/GridDiscoveryManager.java | 118 +---
.../affinity/GridAffinityProcessor.java | 2 +-
.../processors/cache/CacheLockCandidates.java | 42 ++
.../cache/CacheLockCandidatesList.java | 71 +++
.../cache/CacheStoreBalancingWrapper.java | 6 +
.../processors/cache/GridCacheAdapter.java | 8 +-
.../processors/cache/GridCacheEntryEx.java | 3 +-
.../cache/GridCacheLoaderWriterStore.java | 6 +
.../processors/cache/GridCacheMapEntry.java | 117 +++-
.../processors/cache/GridCacheMvcc.java | 376 +++++++----
.../processors/cache/GridCacheMvccCallback.java | 4 +-
.../cache/GridCacheMvccCandidate.java | 80 +--
.../processors/cache/GridCacheMvccManager.java | 19 +-
.../GridCachePartitionExchangeManager.java | 157 ++---
.../processors/cache/GridCachePreloader.java | 11 +-
.../cache/GridCachePreloaderAdapter.java | 5 +-
.../processors/cache/GridCacheProcessor.java | 10 +-
.../processors/cache/GridCacheUtils.java | 17 -
.../binary/CacheObjectBinaryProcessorImpl.java | 3 +-
.../CacheDataStructuresManager.java | 6 +-
.../distributed/GridDistributedCacheEntry.java | 303 +++------
.../dht/GridClientPartitionTopology.java | 120 ++--
.../distributed/dht/GridDhtCacheEntry.java | 32 +-
.../distributed/dht/GridDhtLockFuture.java | 34 +-
.../dht/GridDhtPartitionTopology.java | 28 +-
.../dht/GridDhtPartitionTopologyImpl.java | 284 +++++----
.../dht/GridDhtTransactionalCacheAdapter.java | 1 -
.../distributed/dht/GridDhtTxPrepareFuture.java | 5 +-
.../colocated/GridDhtColocatedLockFuture.java | 8 +-
.../dht/preloader/GridDhtPartitionDemander.java | 230 ++++---
.../dht/preloader/GridDhtPartitionFullMap.java | 18 +-
.../GridDhtPartitionsExchangeFuture.java | 56 +-
.../dht/preloader/GridDhtPreloader.java | 9 +-
.../distributed/near/GridNearCacheEntry.java | 44 +-
.../distributed/near/GridNearLockFuture.java | 3 +-
.../near/GridNearTransactionalCache.java | 5 +-
.../cache/local/GridLocalCacheEntry.java | 173 ++----
.../cache/local/GridLocalLockFuture.java | 2 +-
.../cache/query/GridCacheQueryManager.java | 22 +-
.../cache/transactions/IgniteTxHandler.java | 2 +-
.../cache/transactions/IgniteTxManager.java | 5 +-
.../closure/GridClosureProcessor.java | 31 +-
.../internal/processors/job/GridJobWorker.java | 76 ++-
.../processors/odbc/OdbcRequestHandler.java | 14 +-
.../platform/PlatformContextImpl.java | 2 +-
.../dotnet/PlatformDotNetCacheStore.java | 11 +
.../platform/services/PlatformServices.java | 2 +-
.../platform/utils/PlatformUtils.java | 28 +
.../processors/rest/GridRestProcessor.java | 15 +
.../service/GridServiceProcessor.java | 15 +-
.../processors/service/GridServiceProxy.java | 18 +-
.../processors/task/GridTaskWorker.java | 7 +
.../internal/visor/query/VisorQueryJob.java | 2 +-
.../ignite/marshaller/jdk/JdkMarshaller.java | 4 +-
.../optimized/OptimizedMarshaller.java | 8 +-
.../ignite/spi/discovery/tcp/ServerImpl.java | 41 +-
.../tcp/internal/TcpDiscoveryStatistics.java | 4 +
.../resources/META-INF/classnames.properties | 86 ++-
.../AbstractAffinityFunctionSelfTest.java | 2 +-
.../jdbc/JdbcTypesDefaultTransformerTest.java | 283 +++++++++
.../IgniteComputeTopologyExceptionTest.java | 5 +-
.../binary/BinaryMarshallerSelfTest.java | 66 ++
.../GridDiscoveryManagerAliveCacheSelfTest.java | 2 +-
.../CacheSerializableTransactionsTest.java | 604 +++++++++++++++++-
.../cache/GridCacheMvccFlagsTest.java | 8 +-
.../cache/GridCacheMvccPartitionedSelfTest.java | 334 ++++++++--
.../processors/cache/GridCacheMvccSelfTest.java | 212 +++----
.../GridCachePartitionedAffinitySpreadTest.java | 7 +-
.../processors/cache/GridCacheTestEntryEx.java | 77 +--
...heapCacheMetricsForClusterGroupSelfTest.java | 141 +++++
.../cache/OffheapCacheOnClientsTest.java | 143 +++++
.../distributed/dht/GridCacheDhtTestUtils.java | 232 -------
.../GridCacheRebalancingSyncSelfTest.java | 2 +
.../CacheOffHeapAndSwapMetricsSelfTest.java | 621 -------------------
...LocalCacheOffHeapAndSwapMetricsSelfTest.java | 621 +++++++++++++++++++
.../closure/GridClosureSerializationTest.java | 177 ++++++
...gniteServiceProxyTimeoutInitializedTest.java | 284 +++++++++
.../loadtests/hashmap/GridHashMapLoadTest.java | 7 +-
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
.../IgniteCacheMetricsSelfTestSuite.java | 6 +-
.../ignite/testsuites/IgniteCacheTestSuite.java | 2 +
.../testsuites/IgniteCacheTestSuite2.java | 2 +
.../testsuites/IgniteKernalSelfTestSuite.java | 2 +
.../resources/META-INF/classnames.properties | 114 ++++
.../processors/query/h2/IgniteH2Indexing.java | 77 ++-
.../h2/twostep/GridReduceQueryExecutor.java | 14 +-
...niteCachePartitionedFieldsQuerySelfTest.java | 25 +
101 files changed, 4783 insertions(+), 2473 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 9058837,0000000..b4bed0d
mode 100644,000000..100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@@ -1,519 -1,0 +1,522 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import javax.cache.Cache;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
+import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
+import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
+import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
- import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation;
- import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
- import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation;
++import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.logger.NullLogger;
+import org.apache.ignite.resources.CacheStoreSessionResource;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Implementation of {@link CacheStore} backed by Cassandra database.
+ *
+ * @param <K> Ignite cache key type.
+ * @param <V> Ignite cache value type.
+ */
+public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
+ /** Buffer to store mutations performed withing transaction. */
+ private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
+
+ /** Auto-injected store session. */
+ @SuppressWarnings("unused")
+ @CacheStoreSessionResource
+ private CacheStoreSession storeSes;
+
+ /** Auto-injected logger instance. */
+ @SuppressWarnings("unused")
+ @LoggerResource
+ private IgniteLogger log;
+
+ /** Cassandra data source. */
+ private DataSource dataSrc;
+
+ /** Max workers thread count. These threads are responsible for load cache. */
+ private int maxPoolSize = Runtime.getRuntime().availableProcessors();
+
+ /** Controller component responsible for serialization logic. */
+ private final PersistenceController controller;
+
+ /**
+ * Store constructor.
+ *
+ * @param dataSrc Data source.
+ * @param settings Persistence settings for Ignite key and value objects.
+ * @param maxPoolSize Max workers thread count.
+ */
+ public CassandraCacheStore(DataSource dataSrc, KeyValuePersistenceSettings settings, int maxPoolSize) {
+ this.dataSrc = dataSrc;
+ this.controller = new PersistenceController(settings);
+ this.maxPoolSize = maxPoolSize;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws CacheLoaderException {
+ if (clo == null)
+ return;
+
+ if (args == null || args.length == 0)
+ args = new String[] {"select * from " + controller.getPersistenceSettings().getKeyspace() + "." + cassandraTable() + ";"};
+
+ ExecutorService pool = null;
+
+ Collection<Future<?>> futs = new ArrayList<>(args.length);
+
+ try {
+ pool = Executors.newFixedThreadPool(maxPoolSize);
+
+ CassandraSession ses = getCassandraSession();
+
+ for (Object obj : args) {
+ if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
+ continue;
+
+ futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo)));
+ }
+
+ for (Future<?> fut : futs)
+ U.get(fut);
+
+ if (log != null && log.isDebugEnabled() && storeSes != null)
+ log.debug("Cache loaded from db: " + storeSes.cacheName());
+ }
+ catch (IgniteCheckedException e) {
+ if (storeSes != null)
+ throw new CacheLoaderException("Failed to load Ignite cache: " + storeSes.cacheName(), e.getCause());
+ else
+ throw new CacheLoaderException("Failed to load cache", e.getCause());
+ }
+ finally {
+ U.shutdownNow(getClass(), pool, log);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void sessionEnd(boolean commit) throws CacheWriterException {
+ if (!storeSes.isWithinTransaction())
+ return;
+
+ List<Mutation> mutations = mutations();
+ if (mutations == null || mutations.isEmpty())
+ return;
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(mutations);
+ }
+ finally {
+ mutations.clear();
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"unchecked"})
+ @Override public V load(final K key) throws CacheLoaderException {
+ if (key == null)
+ return null;
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ return ses.execute(new ExecutionAssistant<V>() {
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getLoadStatement(cassandraTable(), false);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement) {
+ return controller.bindKey(statement, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "READ";
+ }
+
+ /** {@inheritDoc} */
+ @Override public V process(Row row) {
+ return row == null ? null : (V)controller.buildValueObject(row);
+ }
+ });
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
+ if (keys == null || !keys.iterator().hasNext())
+ return new HashMap<>();
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ return ses.execute(new GenericBatchExecutionAssistant<Map<K, V>, K>() {
+ private Map<K, V> data = new HashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getLoadStatement(cassandraTable(), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement, K key) {
+ return controller.bindKey(statement, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "BULK_READ";
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<K, V> processedData() {
+ return data;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void process(Row row) {
+ data.put((K)controller.buildKeyObject(row), (V)controller.buildValueObject(row));
+ }
+ }, keys);
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
+ if (entry == null || entry.getKey() == null)
+ return;
+
+ if (storeSes.isWithinTransaction()) {
+ accumulate(new WriteMutation(entry, cassandraTable(), controller));
+ return;
+ }
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(new ExecutionAssistant<Void>() {
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getWriteStatement(cassandraTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement) {
+ return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "WRITE";
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(Row row) {
+ return null;
+ }
+ });
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
+ if (entries == null || entries.isEmpty())
+ return;
+
+ if (storeSes.isWithinTransaction()) {
+ for (Cache.Entry<?, ?> entry : entries)
+ accumulate(new WriteMutation(entry, cassandraTable(), controller));
+
+ return;
+ }
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(new GenericBatchExecutionAssistant<Void, Cache.Entry<? extends K, ? extends V>>() {
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getWriteStatement(cassandraTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement,
+ Cache.Entry<? extends K, ? extends V> entry) {
+ return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "BULK_WRITE";
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return true;
+ }
+ }, entries);
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(final Object key) throws CacheWriterException {
+ if (key == null)
+ return;
+
+ if (storeSes.isWithinTransaction()) {
+ accumulate(new DeleteMutation(key, cassandraTable(), controller));
+ return;
+ }
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(new ExecutionAssistant<Void>() {
+ /** {@inheritDoc} */
+ @Override public boolean tableExistenceRequired() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getDeleteStatement(cassandraTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement) {
+ return controller.bindKey(statement, key);
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "DELETE";
+ }
+
+ /** {@inheritDoc} */
+ @Override public Void process(Row row) {
+ return null;
+ }
+ });
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
+ if (keys == null || keys.isEmpty())
+ return;
+
+ if (storeSes.isWithinTransaction()) {
+ for (Object key : keys)
+ accumulate(new DeleteMutation(key, cassandraTable(), controller));
+
+ return;
+ }
+
+ CassandraSession ses = getCassandraSession();
+
+ try {
+ ses.execute(new GenericBatchExecutionAssistant<Void, Object>() {
+ /** {@inheritDoc} */
+ @Override public String getTable() {
+ return cassandraTable();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String getStatement() {
+ return controller.getDeleteStatement(cassandraTable());
+ }
+
+ /** {@inheritDoc} */
+ @Override public BoundStatement bindStatement(PreparedStatement statement, Object key) {
+ return controller.bindKey(statement, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+ return controller.getPersistenceSettings();
+ }
+
+ /** {@inheritDoc} */
+ @Override public String operationName() {
+ return "BULK_DELETE";
+ }
+ }, keys);
+ }
+ finally {
+ U.closeQuiet(ses);
+ }
+ }
+
+ /**
+ * Gets Cassandra session wrapper or creates new if it doesn't exist.
+ * This wrapper hides all the low-level Cassandra interaction details by providing only high-level methods.
+ *
+ * @return Cassandra session wrapper.
+ */
+ private CassandraSession getCassandraSession() {
+ return dataSrc.session(log != null ? log : new NullLogger());
+ }
+
+ /**
+ * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+ *
+ * @return Table name.
+ */
+ private String cassandraTable() {
+ return controller.getPersistenceSettings().getTable() != null ?
+ controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase();
+ }
+
+ /**
+ * Accumulates mutation in the transaction buffer.
+ *
+ * @param mutation Mutation operation.
+ */
+ private void accumulate(Mutation mutation) {
+ //noinspection unchecked
+ List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
+
+ if (mutations == null) {
+ mutations = new LinkedList<>();
+ storeSes.properties().put(TRANSACTION_BUFFER, mutations);
+ }
+
+ mutations.add(mutation);
+ }
+
+ /**
+ * Returns all the mutations performed withing transaction.
+ *
+ * @return Mutations
+ */
+ private List<Mutation> mutations() {
+ //noinspection unchecked
+ return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
+ }
++
++ /** {@inheritDoc} */
++ @Override public String toString() {
++ return S.toString(CassandraCacheStore.class, this);
++ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index f582aac,0000000..1ba3c7d
mode 100644,000000..100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@@ -1,647 -1,0 +1,656 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.datasource;
+
+import com.datastax.driver.core.AuthProvider;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.policies.AddressTranslator;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import com.datastax.driver.core.policies.RetryPolicy;
+import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
+import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
++import org.apache.ignite.internal.util.tostring.GridToStringExclude;
++import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Data source abstraction to specify configuration of the Cassandra session to be used.
+ */
+public class DataSource implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Null object, used as a replacement for those Cassandra connection options which
+ * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
+ */
+ private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
+
+ /** Number of rows to immediately fetch in CQL statement execution. */
+ private Integer fetchSize;
+
+ /** Consistency level for READ operations. */
+ private ConsistencyLevel readConsistency;
+
+ /** Consistency level for WRITE operations. */
+ private ConsistencyLevel writeConsistency;
+
+ /** Username to use for authentication. */
++ @GridToStringExclude
+ private String user;
+
+ /** Password to use for authentication. */
++ @GridToStringExclude
+ private String pwd;
+
+ /** Port to use for Cassandra connection. */
+ private Integer port;
+
+ /** List of contact points to connect to Cassandra cluster. */
+ private List<InetAddress> contactPoints;
+
+ /** List of contact points with ports to connect to Cassandra cluster. */
+ private List<InetSocketAddress> contactPointsWithPorts;
+
+ /** Maximum time to wait for schema agreement before returning from a DDL query. */
+ private Integer maxSchemaAgreementWaitSeconds;
+
+ /** The native protocol version to use. */
+ private Integer protoVer;
+
+ /** Compression to use for the transport. */
+ private String compression;
+
+ /** Use SSL for communications with Cassandra. */
+ private Boolean useSSL;
+
+ /** Enables metrics collection. */
+ private Boolean collectMetrix;
+
+ /** Enables JMX reporting of the metrics. */
+ private Boolean jmxReporting;
+
+ /** Credentials to use for authentication. */
+ private Credentials creds;
+
+ /** Load balancing policy to use. */
+ private LoadBalancingPolicy loadBalancingPlc;
+
+ /** Reconnection policy to use. */
+ private ReconnectionPolicy reconnectionPlc;
+
+ /** Retry policy to use. */
+ private RetryPolicy retryPlc;
+
+ /** Address translator to use. */
+ private AddressTranslator addrTranslator;
+
+ /** Speculative execution policy to use. */
+ private SpeculativeExecutionPolicy speculativeExecutionPlc;
+
+ /** Authentication provider to use. */
+ private AuthProvider authProvider;
+
+ /** SSL options to use. */
+ private SSLOptions sslOptions;
+
+ /** Connection pooling options to use. */
+ private PoolingOptions poolingOptions;
+
+ /** Socket options to use. */
+ private SocketOptions sockOptions;
+
+ /** Netty options to use for connection. */
+ private NettyOptions nettyOptions;
+
+ /** Cassandra session wrapper instance. */
+ private volatile CassandraSession ses;
+
+ /**
+ * Sets user name to use for authentication.
+ *
+ * @param user user name
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setUser(String user) {
+ this.user = user;
+
+ invalidate();
+ }
+
+ /**
+ * Sets password to use for authentication.
+ *
+ * @param pwd password
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setPassword(String pwd) {
+ this.pwd = pwd;
+
+ invalidate();
+ }
+
+ /**
+ * Sets port to use for Cassandra connection.
+ *
+ * @param port port
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setPort(int port) {
+ this.port = port;
+
+ invalidate();
+ }
+
+ /**
+ * Sets list of contact points to connect to Cassandra cluster.
+ *
+ * @param points contact points
+ */
+ public void setContactPoints(String... points) {
+ if (points == null || points.length == 0)
+ return;
+
+ for (String point : points) {
+ if (point.contains(":")) {
+ if (contactPointsWithPorts == null)
+ contactPointsWithPorts = new LinkedList<>();
+
+ String[] chunks = point.split(":");
+
+ try {
+ contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
+ }
+ catch (Throwable e) {
+ throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+ }
+ }
+ else {
+ if (contactPoints == null)
+ contactPoints = new LinkedList<>();
+
+ try {
+ contactPoints.add(InetAddress.getByName(point));
+ }
+ catch (Throwable e) {
+ throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
+ }
+ }
+ }
+
+ invalidate();
+ }
+
+ /** Sets maximum time to wait for schema agreement before returning from a DDL query. */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setMaxSchemaAgreementWaitSeconds(int seconds) {
+ maxSchemaAgreementWaitSeconds = seconds;
+
+ invalidate();
+ }
+
+ /**
+ * Sets the native protocol version to use.
+ *
+ * @param ver version number
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setProtocolVersion(int ver) {
+ protoVer = ver;
+
+ invalidate();
+ }
+
+ /**
+ * Sets compression algorithm to use for the transport.
+ *
+ * @param compression Compression algorithm.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setCompression(String compression) {
+ this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
+
+ try {
+ if (this.compression != null)
+ ProtocolOptions.Compression.valueOf(this.compression);
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
+ }
+
+ invalidate();
+ }
+
+ /**
+ * Enables SSL for communications with Cassandra.
+ *
+ * @param use Flag to enable/disable SSL.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setUseSSL(boolean use) {
+ useSSL = use;
+
+ invalidate();
+ }
+
+ /**
+ * Enables metrics collection.
+ *
+ * @param collect Flag to enable/disable metrics collection.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setCollectMetrix(boolean collect) {
+ collectMetrix = collect;
+
+ invalidate();
+ }
+
+ /**
+ * Enables JMX reporting of the metrics.
+ *
+ * @param enableReporting Flag to enable/disable JMX reporting.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setJmxReporting(boolean enableReporting) {
+ jmxReporting = enableReporting;
+
+ invalidate();
+ }
+
+ /**
+ * Sets number of rows to immediately fetch in CQL statement execution.
+ *
+ * @param size Number of rows to fetch.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setFetchSize(int size) {
+ fetchSize = size;
+
+ invalidate();
+ }
+
+ /**
+ * Set consistency level for READ operations.
+ *
+ * @param level Consistency level.
+ */
+ public void setReadConsistency(String level) {
+ readConsistency = parseConsistencyLevel(level);
+
+ invalidate();
+ }
+
+ /**
+ * Set consistency level for WRITE operations.
+ *
+ * @param level Consistency level.
+ */
+ public void setWriteConsistency(String level) {
+ writeConsistency = parseConsistencyLevel(level);
+
+ invalidate();
+ }
+
+ /**
+ * Sets credentials to use for authentication.
+ *
+ * @param creds Credentials.
+ */
+ public void setCredentials(Credentials creds) {
+ this.creds = creds;
+
+ invalidate();
+ }
+
+ /**
+ * Sets load balancing policy.
+ *
+ * @param plc Load balancing policy.
+ */
+ public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
+ loadBalancingPlc = plc;
+
+ invalidate();
+ }
+
+ /**
+ * Sets reconnection policy.
+ *
+ * @param plc Reconnection policy.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setReconnectionPolicy(ReconnectionPolicy plc) {
+ reconnectionPlc = plc;
+
+ invalidate();
+ }
+
+ /**
+ * Sets retry policy.
+ *
+ * @param plc Retry policy.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setRetryPolicy(RetryPolicy plc) {
+ retryPlc = plc;
+
+ invalidate();
+ }
+
+ /**
+ * Sets address translator.
+ *
+ * @param translator Address translator.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setAddressTranslator(AddressTranslator translator) {
+ addrTranslator = translator;
+
+ invalidate();
+ }
+
+ /**
+ * Sets speculative execution policy.
+ *
+ * @param plc Speculative execution policy.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
+ speculativeExecutionPlc = plc;
+
+ invalidate();
+ }
+
+ /**
+ * Sets authentication provider.
+ *
+ * @param provider Authentication provider.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setAuthProvider(AuthProvider provider) {
+ authProvider = provider;
+
+ invalidate();
+ }
+
+ /**
+ * Sets SSL options.
+ *
+ * @param options SSL options.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setSslOptions(SSLOptions options) {
+ sslOptions = options;
+
+ invalidate();
+ }
+
+ /**
+ * Sets pooling options.
+ *
+ * @param options pooling options to use.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setPoolingOptions(PoolingOptions options) {
+ poolingOptions = options;
+
+ invalidate();
+ }
+
+ /**
+ * Sets socket options to use.
+ *
+ * @param options Socket options.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setSocketOptions(SocketOptions options) {
+ sockOptions = options;
+
+ invalidate();
+ }
+
+ /**
+ * Sets netty options to use.
+ *
+ * @param options netty options.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public void setNettyOptions(NettyOptions options) {
+ nettyOptions = options;
+
+ invalidate();
+ }
+
+ /**
+ * Creates Cassandra session wrapper if it wasn't created yet and returns it
+ *
+ * @param log logger
+ * @return Cassandra session wrapper
+ */
+ @SuppressWarnings("deprecation")
+ public synchronized CassandraSession session(IgniteLogger log) {
+ if (ses != null)
+ return ses;
+
+ Cluster.Builder builder = Cluster.builder();
+
+ if (user != null)
+ builder = builder.withCredentials(user, pwd);
+
+ if (port != null)
+ builder = builder.withPort(port);
+
+ if (contactPoints != null)
+ builder = builder.addContactPoints(contactPoints);
+
+ if (contactPointsWithPorts != null)
+ builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
+
+ if (maxSchemaAgreementWaitSeconds != null)
+ builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
+
+ if (protoVer != null)
+ builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
+
+ if (compression != null) {
+ try {
+ builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
+ }
+ catch (IllegalArgumentException e) {
+ throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
+ }
+ }
+
+ if (useSSL != null && useSSL)
+ builder = builder.withSSL();
+
+ if (sslOptions != null)
+ builder = builder.withSSL(sslOptions);
+
+ if (collectMetrix != null && !collectMetrix)
+ builder = builder.withoutMetrics();
+
+ if (jmxReporting != null && !jmxReporting)
+ builder = builder.withoutJMXReporting();
+
+ if (creds != null)
+ builder = builder.withCredentials(creds.getUser(), creds.getPassword());
+
+ if (loadBalancingPlc != null)
+ builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
+
+ if (reconnectionPlc != null)
+ builder = builder.withReconnectionPolicy(reconnectionPlc);
+
+ if (retryPlc != null)
+ builder = builder.withRetryPolicy(retryPlc);
+
+ if (addrTranslator != null)
+ builder = builder.withAddressTranslator(addrTranslator);
+
+ if (speculativeExecutionPlc != null)
+ builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
+
+ if (authProvider != null)
+ builder = builder.withAuthProvider(authProvider);
+
+ if (poolingOptions != null)
+ builder = builder.withPoolingOptions(poolingOptions);
+
+ if (sockOptions != null)
+ builder = builder.withSocketOptions(sockOptions);
+
+ if (nettyOptions != null)
+ builder = builder.withNettyOptions(nettyOptions);
+
+ return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(fetchSize);
+ out.writeObject(readConsistency);
+ out.writeObject(writeConsistency);
+ U.writeString(out, user);
+ U.writeString(out, pwd);
+ out.writeObject(port);
+ out.writeObject(contactPoints);
+ out.writeObject(contactPointsWithPorts);
+ out.writeObject(maxSchemaAgreementWaitSeconds);
+ out.writeObject(protoVer);
+ U.writeString(out, compression);
+ out.writeObject(useSSL);
+ out.writeObject(collectMetrix);
+ out.writeObject(jmxReporting);
+ out.writeObject(creds);
+ writeObject(out, loadBalancingPlc);
+ writeObject(out, reconnectionPlc);
+ writeObject(out, addrTranslator);
+ writeObject(out, speculativeExecutionPlc);
+ writeObject(out, authProvider);
+ writeObject(out, sslOptions);
+ writeObject(out, poolingOptions);
+ writeObject(out, sockOptions);
+ writeObject(out, nettyOptions);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fetchSize = (Integer)in.readObject();
+ readConsistency = (ConsistencyLevel)in.readObject();
+ writeConsistency = (ConsistencyLevel)in.readObject();
+ user = U.readString(in);
+ pwd = U.readString(in);
+ port = (Integer)in.readObject();
+ contactPoints = (List<InetAddress>)in.readObject();
+ contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
+ maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
+ protoVer = (Integer)in.readObject();
+ compression = U.readString(in);
+ useSSL = (Boolean)in.readObject();
+ collectMetrix = (Boolean)in.readObject();
+ jmxReporting = (Boolean)in.readObject();
+ creds = (Credentials)in.readObject();
+ loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
+ reconnectionPlc = (ReconnectionPolicy)readObject(in);
+ addrTranslator = (AddressTranslator)readObject(in);
+ speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
+ authProvider = (AuthProvider)readObject(in);
+ sslOptions = (SSLOptions)readObject(in);
+ poolingOptions = (PoolingOptions)readObject(in);
+ sockOptions = (SocketOptions)readObject(in);
+ nettyOptions = (NettyOptions)readObject(in);
+ }
+
+ /**
+ * Helper method used to serialize class members
+ * @param out the stream to write the object to
+ * @param obj the object to be written
+ * @throws IOException Includes any I/O exceptions that may occur
+ */
+ private void writeObject(ObjectOutput out, Object obj) throws IOException {
+ out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
+ }
+
+ /**
+ * Helper method used to deserialize class members
+ * @param in the stream to read data from in order to restore the object
+ * @throws IOException Includes any I/O exceptions that may occur
+ * @throws ClassNotFoundException If the class for an object being restored cannot be found
+ * @return deserialized object
+ */
+ private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+ Object obj = in.readObject();
+ return NULL_OBJECT.equals(obj) ? null : obj;
+ }
+
+ /**
+ * Parses consistency level provided as string.
+ *
+ * @param level consistency level string.
+ *
+ * @return consistency level.
+ */
+ private ConsistencyLevel parseConsistencyLevel(String level) {
+ if (level == null)
+ return null;
+
+ try {
+ return ConsistencyLevel.valueOf(level.trim().toUpperCase());
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
+ }
+ }
+
+ /**
+ * Invalidates session.
+ */
+ private synchronized void invalidate() {
+ ses = null;
+ }
++
++ /** {@inheritDoc} */
++ @Override public String toString() {
++ return S.toString(DataSource.class, this);
++ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5f0b8a0,4d59d50..8187e8f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -3076,8 -3233,8 +3076,8 @@@ public abstract class GridCacheAdapter<
}
/** {@inheritDoc} */
- @Override public CacheMetrics clusterMetrics() {
+ @Override public final CacheMetrics clusterMetrics() {
- return clusterMetrics(ctx.grid().cluster().forCacheNodes(ctx.name()));
+ return clusterMetrics(ctx.grid().cluster().forDataNodes(ctx.name()));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index aeb3ef4,3690f35..d26242d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1587,9 -1590,9 +1590,9 @@@ public class GridServiceProcessor exten
else
topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
- depExe.submit(new BusyRunnable() {
+ depExe.execute(new BusyRunnable() {
@Override public void run0() {
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+ ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
if (oldest != null && oldest.isLocal()) {
final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index ee5b65c,43017db..8f8d78a
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@@ -104,24 -104,9 +104,25 @@@ public abstract class AbstractAffinityF
}
/**
+ * @param backups Number of backups.
* @throws Exception If failed.
*/
+ public void testNullKeyForPartitionCalculation() throws Exception {
+ AffinityFunction aff = affinityFunction();
+
+ try {
+ aff.partition(null);
+
+ fail("Should throw IllegalArgumentException due to NULL affinity key.");
+ } catch (IllegalArgumentException e) {
+ e.getMessage().contains("Null key is passed for a partition calculation. " +
+ "Make sure that an affinity key that is used is initialized properly.");
+ }
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
protected void checkNodeRemoved(int backups) throws Exception {
checkNodeRemoved(backups, 1, 1);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 554bb3d,deec72a..1e73e79
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@@ -39,10 -39,7 +39,11 @@@ import org.apache.ignite.cache.store.jd
import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
+import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
+import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
+import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
+ import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTest;
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 2d06f3a,350b715..b28619c
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -66,7 -66,7 +66,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingDefaultMarshallerTest;
import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingJdkMarshallerTest;
import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest;
+import org.apache.ignite.internal.processors.service.IgniteServiceDynamicCachesSelfTest;
+ import org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest;
import org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest;
import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
@@@ -142,7 -142,7 +143,8 @@@ public class IgniteKernalSelfTestSuite
suite.addTestSuite(GridServiceProxyNodeStopSelfTest.class);
suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
suite.addTestSuite(IgniteServiceReassignmentTest.class);
+ suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
+ suite.addTestSuite(IgniteServiceDynamicCachesSelfTest.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5df44db,362ddd8..c541185
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -794,40 -774,29 +794,45 @@@ public class IgniteH2Indexing implement
throws IgniteCheckedException {
final Connection conn = connectionForSpace(spaceName);
- initLocalQueryContext(conn, enforceJoinOrder, filters);
+ setupConnection(conn, false, enforceJoinOrder);
- Prepared p = null;
+ final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
- try {
- final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
++ Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
+
- p = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
++ if (!p.isQuery()) {
++ GridH2QueryContext.clearThreadLocal();
+
- if (!p.isQuery()) {
- GridH2QueryContext.clearThreadLocal();
++ SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
+
- SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
++ if (params != null)
++ fldsQry.setArgs(params.toArray());
+
- if (params != null)
- fldsQry.setArgs(params.toArray());
++ fldsQry.setEnforceJoinOrder(enforceJoinOrder);
++ fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
+
- fldsQry.setEnforceJoinOrder(enforceJoinOrder);
- fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
++ return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
++ }
+
- return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
- }
+ List<GridQueryFieldMetadata> meta;
- List<GridQueryFieldMetadata> meta;
+ try {
+ meta = meta(stmt.getMetaData());
+ }
+ catch (SQLException e) {
+ throw new IgniteCheckedException("Cannot prepare query metadata", e);
+ }
- try {
- meta = meta(stmt.getMetaData());
- }
- catch (SQLException e) {
- throw new IgniteCheckedException("Cannot prepare query metadata", e);
- }
+ final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
+ .filter(filters).distributedJoins(false);
- return new GridQueryFieldsResultAdapter(meta, null) {
- @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
+ return new GridQueryFieldsResultAdapter(meta, null) {
+ @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
+ assert GridH2QueryContext.get() == null;
+
+ GridH2QueryContext.set(ctx);
+
+ try {
ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
return new FieldsIterator(rs);
http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------