You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:23 UTC

[43/50] [abbrv] ignite git commit: Merge remote-tracking branch 'remotes/community/ignite-1.7.4' into ignite-1.8.2

Merge remote-tracking branch 'remotes/community/ignite-1.7.4' into ignite-1.8.2

# Conflicts:
#	modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
#	modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
#	modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateInvokeRequest.java
#	modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
#	modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
#	modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
#	modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
#	modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
#	modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/64247b92
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/64247b92
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/64247b92

Branch: refs/heads/master
Commit: 64247b9228451e46abb8029e09c7fc6ed4e16d2d
Parents: 147277d 8dd4ada
Author: sboikov <sb...@gridgain.com>
Authored: Mon Dec 19 15:54:39 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Mon Dec 19 15:54:39 2016 +0300

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    |   9 +-
 .../store/cassandra/datasource/DataSource.java  |   9 +
 .../rest/RestProcessorMultiStartSelfTest.java   |  48 +-
 .../java/org/apache/ignite/IgniteServices.java  |  16 +
 .../apache/ignite/IgniteSystemProperties.java   |   6 +
 .../rendezvous/RendezvousAffinityFunction.java  |  80 ++-
 .../ignite/cache/store/CacheStoreAdapter.java   |   6 +
 .../cache/store/jdbc/CacheJdbcPojoStore.java    |  19 +-
 .../store/jdbc/JdbcTypesDefaultTransformer.java | 112 ++--
 .../apache/ignite/internal/IgniteKernal.java    |  28 +-
 .../ignite/internal/IgniteServicesImpl.java     |   9 +-
 .../internal/binary/BinaryClassDescriptor.java  |  12 +-
 .../ignite/internal/binary/BinaryUtils.java     |  10 +-
 .../binary/builder/BinaryObjectBuilderImpl.java |  11 +-
 .../discovery/GridDiscoveryManager.java         | 118 +---
 .../affinity/GridAffinityProcessor.java         |   2 +-
 .../processors/cache/CacheLockCandidates.java   |  42 ++
 .../cache/CacheLockCandidatesList.java          |  71 +++
 .../cache/CacheStoreBalancingWrapper.java       |   6 +
 .../processors/cache/GridCacheAdapter.java      |   8 +-
 .../processors/cache/GridCacheEntryEx.java      |   3 +-
 .../cache/GridCacheLoaderWriterStore.java       |   6 +
 .../processors/cache/GridCacheMapEntry.java     | 117 +++-
 .../processors/cache/GridCacheMvcc.java         | 376 +++++++----
 .../processors/cache/GridCacheMvccCallback.java |   4 +-
 .../cache/GridCacheMvccCandidate.java           |  80 +--
 .../processors/cache/GridCacheMvccManager.java  |  19 +-
 .../GridCachePartitionExchangeManager.java      | 157 ++---
 .../processors/cache/GridCachePreloader.java    |  11 +-
 .../cache/GridCachePreloaderAdapter.java        |   5 +-
 .../processors/cache/GridCacheProcessor.java    |  10 +-
 .../processors/cache/GridCacheUtils.java        |  17 -
 .../binary/CacheObjectBinaryProcessorImpl.java  |   3 +-
 .../CacheDataStructuresManager.java             |   6 +-
 .../distributed/GridDistributedCacheEntry.java  | 303 +++------
 .../dht/GridClientPartitionTopology.java        | 120 ++--
 .../distributed/dht/GridDhtCacheEntry.java      |  32 +-
 .../distributed/dht/GridDhtLockFuture.java      |  34 +-
 .../dht/GridDhtPartitionTopology.java           |  28 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 284 +++++----
 .../dht/GridDhtTransactionalCacheAdapter.java   |   1 -
 .../distributed/dht/GridDhtTxPrepareFuture.java |   5 +-
 .../colocated/GridDhtColocatedLockFuture.java   |   8 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 230 ++++---
 .../dht/preloader/GridDhtPartitionFullMap.java  |  18 +-
 .../GridDhtPartitionsExchangeFuture.java        |  56 +-
 .../dht/preloader/GridDhtPreloader.java         |   9 +-
 .../distributed/near/GridNearCacheEntry.java    |  44 +-
 .../distributed/near/GridNearLockFuture.java    |   3 +-
 .../near/GridNearTransactionalCache.java        |   5 +-
 .../cache/local/GridLocalCacheEntry.java        | 173 ++----
 .../cache/local/GridLocalLockFuture.java        |   2 +-
 .../cache/query/GridCacheQueryManager.java      |  22 +-
 .../cache/transactions/IgniteTxHandler.java     |   2 +-
 .../cache/transactions/IgniteTxManager.java     |   5 +-
 .../closure/GridClosureProcessor.java           |  31 +-
 .../internal/processors/job/GridJobWorker.java  |  76 ++-
 .../processors/odbc/OdbcRequestHandler.java     |  14 +-
 .../platform/PlatformContextImpl.java           |   2 +-
 .../dotnet/PlatformDotNetCacheStore.java        |  11 +
 .../platform/services/PlatformServices.java     |   2 +-
 .../platform/utils/PlatformUtils.java           |  28 +
 .../processors/rest/GridRestProcessor.java      |  15 +
 .../service/GridServiceProcessor.java           |  15 +-
 .../processors/service/GridServiceProxy.java    |  18 +-
 .../processors/task/GridTaskWorker.java         |   7 +
 .../internal/visor/query/VisorQueryJob.java     |   2 +-
 .../ignite/marshaller/jdk/JdkMarshaller.java    |   4 +-
 .../optimized/OptimizedMarshaller.java          |   8 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java    |  41 +-
 .../tcp/internal/TcpDiscoveryStatistics.java    |   4 +
 .../resources/META-INF/classnames.properties    |  86 ++-
 .../AbstractAffinityFunctionSelfTest.java       |   2 +-
 .../jdbc/JdbcTypesDefaultTransformerTest.java   | 283 +++++++++
 .../IgniteComputeTopologyExceptionTest.java     |   5 +-
 .../binary/BinaryMarshallerSelfTest.java        |  66 ++
 .../GridDiscoveryManagerAliveCacheSelfTest.java |   2 +-
 .../CacheSerializableTransactionsTest.java      | 604 +++++++++++++++++-
 .../cache/GridCacheMvccFlagsTest.java           |   8 +-
 .../cache/GridCacheMvccPartitionedSelfTest.java | 334 ++++++++--
 .../processors/cache/GridCacheMvccSelfTest.java | 212 +++----
 .../GridCachePartitionedAffinitySpreadTest.java |   7 +-
 .../processors/cache/GridCacheTestEntryEx.java  |  77 +--
 ...heapCacheMetricsForClusterGroupSelfTest.java | 141 +++++
 .../cache/OffheapCacheOnClientsTest.java        | 143 +++++
 .../distributed/dht/GridCacheDhtTestUtils.java  | 232 -------
 .../GridCacheRebalancingSyncSelfTest.java       |   2 +
 .../CacheOffHeapAndSwapMetricsSelfTest.java     | 621 -------------------
 ...LocalCacheOffHeapAndSwapMetricsSelfTest.java | 621 +++++++++++++++++++
 .../closure/GridClosureSerializationTest.java   | 177 ++++++
 ...gniteServiceProxyTimeoutInitializedTest.java | 284 +++++++++
 .../loadtests/hashmap/GridHashMapLoadTest.java  |   7 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   2 +
 .../IgniteCacheMetricsSelfTestSuite.java        |   6 +-
 .../ignite/testsuites/IgniteCacheTestSuite.java |   2 +
 .../testsuites/IgniteCacheTestSuite2.java       |   2 +
 .../testsuites/IgniteKernalSelfTestSuite.java   |   2 +
 .../resources/META-INF/classnames.properties    | 114 ++++
 .../processors/query/h2/IgniteH2Indexing.java   |  77 ++-
 .../h2/twostep/GridReduceQueryExecutor.java     |  14 +-
 ...niteCachePartitionedFieldsQuerySelfTest.java |  25 +
 101 files changed, 4783 insertions(+), 2473 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 9058837,0000000..b4bed0d
mode 100644,000000..100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@@ -1,519 -1,0 +1,522 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.cache.store.cassandra;
 +
 +import com.datastax.driver.core.BoundStatement;
 +import com.datastax.driver.core.PreparedStatement;
 +import com.datastax.driver.core.Row;
 +import java.util.ArrayList;
 +import java.util.Collection;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.HashMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +import javax.cache.Cache;
 +import javax.cache.integration.CacheLoaderException;
 +import javax.cache.integration.CacheWriterException;
 +import org.apache.ignite.IgniteCheckedException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cache.store.CacheStore;
 +import org.apache.ignite.cache.store.CacheStoreSession;
 +import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
 +import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
 +import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
 +import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
 +import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
 +import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
 +import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
- import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation;
- import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
- import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation;
++import org.apache.ignite.internal.util.typedef.internal.S;
 +import org.apache.ignite.internal.util.typedef.internal.U;
 +import org.apache.ignite.lang.IgniteBiInClosure;
 +import org.apache.ignite.logger.NullLogger;
 +import org.apache.ignite.resources.CacheStoreSessionResource;
 +import org.apache.ignite.resources.LoggerResource;
 +
 +/**
 + * Implementation of {@link CacheStore} backed by Cassandra database.
 + *
 + * @param <K> Ignite cache key type.
 + * @param <V> Ignite cache value type.
 + */
 +public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
 +    /** Buffer to store mutations performed withing transaction. */
 +    private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
 +
 +    /** Auto-injected store session. */
 +    @SuppressWarnings("unused")
 +    @CacheStoreSessionResource
 +    private CacheStoreSession storeSes;
 +
 +    /** Auto-injected logger instance. */
 +    @SuppressWarnings("unused")
 +    @LoggerResource
 +    private IgniteLogger log;
 +
 +    /** Cassandra data source. */
 +    private DataSource dataSrc;
 +
 +    /** Max workers thread count. These threads are responsible for load cache. */
 +    private int maxPoolSize = Runtime.getRuntime().availableProcessors();
 +
 +    /** Controller component responsible for serialization logic. */
 +    private final PersistenceController controller;
 +
 +    /**
 +     * Store constructor.
 +     *
 +     * @param dataSrc Data source.
 +     * @param settings Persistence settings for Ignite key and value objects.
 +     * @param maxPoolSize Max workers thread count.
 +     */
 +    public CassandraCacheStore(DataSource dataSrc, KeyValuePersistenceSettings settings, int maxPoolSize) {
 +        this.dataSrc = dataSrc;
 +        this.controller = new PersistenceController(settings);
 +        this.maxPoolSize = maxPoolSize;
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void loadCache(IgniteBiInClosure<K, V> clo, Object... args) throws CacheLoaderException {
 +        if (clo == null)
 +            return;
 +
 +        if (args == null || args.length == 0)
 +            args = new String[] {"select * from " + controller.getPersistenceSettings().getKeyspace() + "." + cassandraTable() + ";"};
 +
 +        ExecutorService pool = null;
 +
 +        Collection<Future<?>> futs = new ArrayList<>(args.length);
 +
 +        try {
 +            pool = Executors.newFixedThreadPool(maxPoolSize);
 +
 +            CassandraSession ses = getCassandraSession();
 +
 +            for (Object obj : args) {
 +                if (obj == null || !(obj instanceof String) || !((String)obj).trim().toLowerCase().startsWith("select"))
 +                    continue;
 +
 +                futs.add(pool.submit(new LoadCacheCustomQueryWorker<>(ses, (String) obj, controller, log, clo)));
 +            }
 +
 +            for (Future<?> fut : futs)
 +                U.get(fut);
 +
 +            if (log != null && log.isDebugEnabled() && storeSes != null)
 +                log.debug("Cache loaded from db: " + storeSes.cacheName());
 +        }
 +        catch (IgniteCheckedException e) {
 +            if (storeSes != null)
 +                throw new CacheLoaderException("Failed to load Ignite cache: " + storeSes.cacheName(), e.getCause());
 +            else
 +                throw new CacheLoaderException("Failed to load cache", e.getCause());
 +        }
 +        finally {
 +            U.shutdownNow(getClass(), pool, log);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void sessionEnd(boolean commit) throws CacheWriterException {
 +        if (!storeSes.isWithinTransaction())
 +            return;
 +
 +        List<Mutation> mutations = mutations();
 +        if (mutations == null || mutations.isEmpty())
 +            return;
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            ses.execute(mutations);
 +        }
 +        finally {
 +            mutations.clear();
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings({"unchecked"})
 +    @Override public V load(final K key) throws CacheLoaderException {
 +        if (key == null)
 +            return null;
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            return ses.execute(new ExecutionAssistant<V>() {
 +                /** {@inheritDoc} */
 +                @Override public boolean tableExistenceRequired() {
 +                    return false;
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getLoadStatement(cassandraTable(), false);
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public BoundStatement bindStatement(PreparedStatement statement) {
 +                    return controller.bindKey(statement, key);
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "READ";
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public V process(Row row) {
 +                    return row == null ? null : (V)controller.buildValueObject(row);
 +                }
 +            });
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public Map<K, V> loadAll(Iterable<? extends K> keys) throws CacheLoaderException {
 +        if (keys == null || !keys.iterator().hasNext())
 +            return new HashMap<>();
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            return ses.execute(new GenericBatchExecutionAssistant<Map<K, V>, K>() {
 +                private Map<K, V> data = new HashMap<>();
 +
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getLoadStatement(cassandraTable(), true);
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override  public BoundStatement bindStatement(PreparedStatement statement, K key) {
 +                    return controller.bindKey(statement, key);
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "BULK_READ";
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public Map<K, V> processedData() {
 +                    return data;
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override protected void process(Row row) {
 +                    data.put((K)controller.buildKeyObject(row), (V)controller.buildValueObject(row));
 +                }
 +            }, keys);
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void write(final Cache.Entry<? extends K, ? extends V> entry) throws CacheWriterException {
 +        if (entry == null || entry.getKey() == null)
 +            return;
 +
 +        if (storeSes.isWithinTransaction()) {
 +            accumulate(new WriteMutation(entry, cassandraTable(), controller));
 +            return;
 +        }
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            ses.execute(new ExecutionAssistant<Void>() {
 +                /** {@inheritDoc} */
 +                @Override public boolean tableExistenceRequired() {
 +                    return true;
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getWriteStatement(cassandraTable());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public BoundStatement bindStatement(PreparedStatement statement) {
 +                    return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "WRITE";
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public Void process(Row row) {
 +                    return null;
 +                }
 +            });
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeAll(Collection<Cache.Entry<? extends K, ? extends V>> entries) throws CacheWriterException {
 +        if (entries == null || entries.isEmpty())
 +            return;
 +
 +        if (storeSes.isWithinTransaction()) {
 +            for (Cache.Entry<?, ?> entry : entries)
 +                accumulate(new WriteMutation(entry, cassandraTable(), controller));
 +
 +            return;
 +        }
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            ses.execute(new GenericBatchExecutionAssistant<Void, Cache.Entry<? extends K, ? extends V>>() {
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getWriteStatement(cassandraTable());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public BoundStatement bindStatement(PreparedStatement statement,
 +                    Cache.Entry<? extends K, ? extends V> entry) {
 +                    return controller.bindKeyValue(statement, entry.getKey(), entry.getValue());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "BULK_WRITE";
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public boolean tableExistenceRequired() {
 +                    return true;
 +                }
 +            }, entries);
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void delete(final Object key) throws CacheWriterException {
 +        if (key == null)
 +            return;
 +
 +        if (storeSes.isWithinTransaction()) {
 +            accumulate(new DeleteMutation(key, cassandraTable(), controller));
 +            return;
 +        }
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            ses.execute(new ExecutionAssistant<Void>() {
 +                /** {@inheritDoc} */
 +                @Override public boolean tableExistenceRequired() {
 +                    return false;
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getDeleteStatement(cassandraTable());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public BoundStatement bindStatement(PreparedStatement statement) {
 +                    return controller.bindKey(statement, key);
 +                }
 +
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "DELETE";
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public Void process(Row row) {
 +                    return null;
 +                }
 +            });
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
 +        if (keys == null || keys.isEmpty())
 +            return;
 +
 +        if (storeSes.isWithinTransaction()) {
 +            for (Object key : keys)
 +                accumulate(new DeleteMutation(key, cassandraTable(), controller));
 +
 +            return;
 +        }
 +
 +        CassandraSession ses = getCassandraSession();
 +
 +        try {
 +            ses.execute(new GenericBatchExecutionAssistant<Void, Object>() {
 +                /** {@inheritDoc} */
 +                @Override public String getTable() {
 +                    return cassandraTable();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String getStatement() {
 +                    return controller.getDeleteStatement(cassandraTable());
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public BoundStatement bindStatement(PreparedStatement statement, Object key) {
 +                    return controller.bindKey(statement, key);
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public KeyValuePersistenceSettings getPersistenceSettings() {
 +                    return controller.getPersistenceSettings();
 +                }
 +
 +                /** {@inheritDoc} */
 +                @Override public String operationName() {
 +                    return "BULK_DELETE";
 +                }
 +            }, keys);
 +        }
 +        finally {
 +            U.closeQuiet(ses);
 +        }
 +    }
 +
 +    /**
 +     * Gets Cassandra session wrapper or creates new if it doesn't exist.
 +     * This wrapper hides all the low-level Cassandra interaction details by providing only high-level methods.
 +     *
 +     * @return Cassandra session wrapper.
 +     */
 +    private CassandraSession getCassandraSession() {
 +        return dataSrc.session(log != null ? log : new NullLogger());
 +    }
 +
 +    /**
 +     * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
 +     *
 +     * @return Table name.
 +     */
 +    private String cassandraTable() {
 +        return controller.getPersistenceSettings().getTable() != null ?
 +            controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase();
 +    }
 +
 +    /**
 +     * Accumulates mutation in the transaction buffer.
 +     *
 +     * @param mutation Mutation operation.
 +     */
 +    private void accumulate(Mutation mutation) {
 +        //noinspection unchecked
 +        List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
 +
 +        if (mutations == null) {
 +            mutations = new LinkedList<>();
 +            storeSes.properties().put(TRANSACTION_BUFFER, mutations);
 +        }
 +
 +        mutations.add(mutation);
 +    }
 +
 +    /**
 +     * Returns all the mutations performed withing transaction.
 +     *
 +     * @return Mutations
 +     */
 +    private List<Mutation> mutations() {
 +        //noinspection unchecked
 +        return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
 +    }
++
++    /** {@inheritDoc} */
++    @Override public String toString() {
++        return S.toString(CassandraCacheStore.class, this);
++    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --cc modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index f582aac,0000000..1ba3c7d
mode 100644,000000..100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@@ -1,647 -1,0 +1,656 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.ignite.cache.store.cassandra.datasource;
 +
 +import com.datastax.driver.core.AuthProvider;
 +import com.datastax.driver.core.Cluster;
 +import com.datastax.driver.core.ConsistencyLevel;
 +import com.datastax.driver.core.NettyOptions;
 +import com.datastax.driver.core.PoolingOptions;
 +import com.datastax.driver.core.ProtocolOptions;
 +import com.datastax.driver.core.ProtocolVersion;
 +import com.datastax.driver.core.SSLOptions;
 +import com.datastax.driver.core.SocketOptions;
 +import com.datastax.driver.core.policies.AddressTranslator;
 +import com.datastax.driver.core.policies.LoadBalancingPolicy;
 +import com.datastax.driver.core.policies.ReconnectionPolicy;
 +import com.datastax.driver.core.policies.RetryPolicy;
 +import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
 +
 +import java.io.Externalizable;
 +import java.io.IOException;
 +import java.io.ObjectInput;
 +import java.io.ObjectOutput;
 +import java.io.Serializable;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import org.apache.ignite.IgniteException;
 +import org.apache.ignite.IgniteLogger;
 +import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
 +import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
 +import org.apache.ignite.internal.util.typedef.internal.U;
++import org.apache.ignite.internal.util.tostring.GridToStringExclude;
++import org.apache.ignite.internal.util.typedef.internal.S;
 +
 +/**
 + * Data source abstraction to specify configuration of the Cassandra session to be used.
 + */
 +public class DataSource implements Externalizable {
 +    /** */
 +    private static final long serialVersionUID = 0L;
 +
 +    /**
 +     * Null object, used as a replacement for those Cassandra connection options which
 +     * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
 +     */
 +    private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
 +
 +    /** Number of rows to immediately fetch in CQL statement execution. */
 +    private Integer fetchSize;
 +
 +    /** Consistency level for READ operations. */
 +    private ConsistencyLevel readConsistency;
 +
 +    /** Consistency level for WRITE operations. */
 +    private ConsistencyLevel writeConsistency;
 +
 +    /** Username to use for authentication. */
++    @GridToStringExclude
 +    private String user;
 +
 +    /** Password to use for authentication. */
++    @GridToStringExclude
 +    private String pwd;
 +
 +    /** Port to use for Cassandra connection. */
 +    private Integer port;
 +
 +    /** List of contact points to connect to Cassandra cluster. */
 +    private List<InetAddress> contactPoints;
 +
 +    /** List of contact points with ports to connect to Cassandra cluster. */
 +    private List<InetSocketAddress> contactPointsWithPorts;
 +
 +    /** Maximum time to wait for schema agreement before returning from a DDL query. */
 +    private Integer maxSchemaAgreementWaitSeconds;
 +
 +    /** The native protocol version to use. */
 +    private Integer protoVer;
 +
 +    /** Compression to use for the transport. */
 +    private String compression;
 +
 +    /** Use SSL for communications with Cassandra. */
 +    private Boolean useSSL;
 +
 +    /** Enables metrics collection. */
 +    private Boolean collectMetrix;
 +
 +    /** Enables JMX reporting of the metrics. */
 +    private Boolean jmxReporting;
 +
 +    /** Credentials to use for authentication. */
 +    private Credentials creds;
 +
 +    /** Load balancing policy to use. */
 +    private LoadBalancingPolicy loadBalancingPlc;
 +
 +    /** Reconnection policy to use. */
 +    private ReconnectionPolicy reconnectionPlc;
 +
 +    /** Retry policy to use. */
 +    private RetryPolicy retryPlc;
 +
 +    /** Address translator to use. */
 +    private AddressTranslator addrTranslator;
 +
 +    /** Speculative execution policy to use. */
 +    private SpeculativeExecutionPolicy speculativeExecutionPlc;
 +
 +    /** Authentication provider to use. */
 +    private AuthProvider authProvider;
 +
 +    /** SSL options to use. */
 +    private SSLOptions sslOptions;
 +
 +    /** Connection pooling options to use. */
 +    private PoolingOptions poolingOptions;
 +
 +    /** Socket options to use. */
 +    private SocketOptions sockOptions;
 +
 +    /** Netty options to use for connection. */
 +    private NettyOptions nettyOptions;
 +
 +    /** Cassandra session wrapper instance. */
 +    private volatile CassandraSession ses;
 +
 +    /**
 +     * Sets user name to use for authentication.
 +     *
 +     * @param user user name
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setUser(String user) {
 +        this.user = user;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets password to use for authentication.
 +     *
 +     * @param pwd password
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setPassword(String pwd) {
 +        this.pwd = pwd;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets port to use for Cassandra connection.
 +     *
 +     * @param port port
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setPort(int port) {
 +        this.port = port;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets list of contact points to connect to Cassandra cluster.
 +     *
 +     * @param points contact points
 +     */
 +    public void setContactPoints(String... points) {
 +        if (points == null || points.length == 0)
 +            return;
 +
 +        for (String point : points) {
 +            if (point.contains(":")) {
 +                if (contactPointsWithPorts == null)
 +                    contactPointsWithPorts = new LinkedList<>();
 +
 +                String[] chunks = point.split(":");
 +
 +                try {
 +                    contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
 +                }
 +                catch (Throwable e) {
 +                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
 +                }
 +            }
 +            else {
 +                if (contactPoints == null)
 +                    contactPoints = new LinkedList<>();
 +
 +                try {
 +                    contactPoints.add(InetAddress.getByName(point));
 +                }
 +                catch (Throwable e) {
 +                    throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
 +                }
 +            }
 +        }
 +
 +        invalidate();
 +    }
 +
 +    /** Sets maximum time to wait for schema agreement before returning from a DDL query. */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setMaxSchemaAgreementWaitSeconds(int seconds) {
 +        maxSchemaAgreementWaitSeconds = seconds;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets the native protocol version to use.
 +     *
 +     * @param ver version number
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setProtocolVersion(int ver) {
 +        protoVer = ver;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets compression algorithm to use for the transport.
 +     *
 +     * @param compression Compression algorithm.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setCompression(String compression) {
 +        this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
 +
 +        try {
 +            if (this.compression != null)
 +                ProtocolOptions.Compression.valueOf(this.compression);
 +        }
 +        catch (Throwable e) {
 +            throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
 +        }
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Enables SSL for communications with Cassandra.
 +     *
 +     * @param use Flag to enable/disable SSL.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setUseSSL(boolean use) {
 +        useSSL = use;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Enables metrics collection.
 +     *
 +     * @param collect Flag to enable/disable metrics collection.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setCollectMetrix(boolean collect) {
 +        collectMetrix = collect;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Enables JMX reporting of the metrics.
 +     *
 +     * @param enableReporting Flag to enable/disable JMX reporting.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setJmxReporting(boolean enableReporting) {
 +        jmxReporting = enableReporting;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets number of rows to immediately fetch in CQL statement execution.
 +     *
 +     * @param size Number of rows to fetch.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setFetchSize(int size) {
 +        fetchSize = size;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Set consistency level for READ operations.
 +     *
 +     * @param level Consistency level.
 +     */
 +    public void setReadConsistency(String level) {
 +        readConsistency = parseConsistencyLevel(level);
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Set consistency level for WRITE operations.
 +     *
 +     * @param level Consistency level.
 +     */
 +    public void setWriteConsistency(String level) {
 +        writeConsistency = parseConsistencyLevel(level);
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets credentials to use for authentication.
 +     *
 +     * @param creds Credentials.
 +     */
 +    public void setCredentials(Credentials creds) {
 +        this.creds = creds;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets load balancing policy.
 +     *
 +     * @param plc Load balancing policy.
 +     */
 +    public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
 +        loadBalancingPlc = plc;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets reconnection policy.
 +     *
 +     * @param plc Reconnection policy.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setReconnectionPolicy(ReconnectionPolicy plc) {
 +        reconnectionPlc = plc;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets retry policy.
 +     *
 +     * @param plc Retry policy.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setRetryPolicy(RetryPolicy plc) {
 +        retryPlc = plc;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets address translator.
 +     *
 +     * @param translator Address translator.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setAddressTranslator(AddressTranslator translator) {
 +        addrTranslator = translator;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets speculative execution policy.
 +     *
 +     * @param plc Speculative execution policy.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
 +        speculativeExecutionPlc = plc;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets authentication provider.
 +     *
 +     * @param provider Authentication provider.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setAuthProvider(AuthProvider provider) {
 +        authProvider = provider;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets SSL options.
 +     *
 +     * @param options SSL options.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setSslOptions(SSLOptions options) {
 +        sslOptions = options;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets pooling options.
 +     *
 +     * @param options pooling options to use.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setPoolingOptions(PoolingOptions options) {
 +        poolingOptions = options;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets socket options to use.
 +     *
 +     * @param options Socket options.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setSocketOptions(SocketOptions options) {
 +        sockOptions = options;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Sets netty options to use.
 +     *
 +     * @param options netty options.
 +     */
 +    @SuppressWarnings("UnusedDeclaration")
 +    public void setNettyOptions(NettyOptions options) {
 +        nettyOptions = options;
 +
 +        invalidate();
 +    }
 +
 +    /**
 +     * Creates Cassandra session wrapper if it wasn't created yet and returns it
 +     *
 +     * @param log logger
 +     * @return Cassandra session wrapper
 +     */
 +    @SuppressWarnings("deprecation")
 +    public synchronized CassandraSession session(IgniteLogger log) {
 +        if (ses != null)
 +            return ses;
 +
 +        Cluster.Builder builder = Cluster.builder();
 +
 +        if (user != null)
 +            builder = builder.withCredentials(user, pwd);
 +
 +        if (port != null)
 +            builder = builder.withPort(port);
 +
 +        if (contactPoints != null)
 +            builder = builder.addContactPoints(contactPoints);
 +
 +        if (contactPointsWithPorts != null)
 +            builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
 +
 +        if (maxSchemaAgreementWaitSeconds != null)
 +            builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
 +
 +        if (protoVer != null)
 +            builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
 +
 +        if (compression != null) {
 +            try {
 +                builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
 +            }
 +            catch (IllegalArgumentException e) {
 +                throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
 +            }
 +        }
 +
 +        if (useSSL != null && useSSL)
 +            builder = builder.withSSL();
 +
 +        if (sslOptions != null)
 +            builder = builder.withSSL(sslOptions);
 +
 +        if (collectMetrix != null && !collectMetrix)
 +            builder = builder.withoutMetrics();
 +
 +        if (jmxReporting != null && !jmxReporting)
 +            builder = builder.withoutJMXReporting();
 +
 +        if (creds != null)
 +            builder = builder.withCredentials(creds.getUser(), creds.getPassword());
 +
 +        if (loadBalancingPlc != null)
 +            builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
 +
 +        if (reconnectionPlc != null)
 +            builder = builder.withReconnectionPolicy(reconnectionPlc);
 +
 +        if (retryPlc != null)
 +            builder = builder.withRetryPolicy(retryPlc);
 +
 +        if (addrTranslator != null)
 +            builder = builder.withAddressTranslator(addrTranslator);
 +
 +        if (speculativeExecutionPlc != null)
 +            builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
 +
 +        if (authProvider != null)
 +            builder = builder.withAuthProvider(authProvider);
 +
 +        if (poolingOptions != null)
 +            builder = builder.withPoolingOptions(poolingOptions);
 +
 +        if (sockOptions != null)
 +            builder = builder.withSocketOptions(sockOptions);
 +
 +        if (nettyOptions != null)
 +            builder = builder.withNettyOptions(nettyOptions);
 +
 +        return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @Override public void writeExternal(ObjectOutput out) throws IOException {
 +        out.writeObject(fetchSize);
 +        out.writeObject(readConsistency);
 +        out.writeObject(writeConsistency);
 +        U.writeString(out, user);
 +        U.writeString(out, pwd);
 +        out.writeObject(port);
 +        out.writeObject(contactPoints);
 +        out.writeObject(contactPointsWithPorts);
 +        out.writeObject(maxSchemaAgreementWaitSeconds);
 +        out.writeObject(protoVer);
 +        U.writeString(out, compression);
 +        out.writeObject(useSSL);
 +        out.writeObject(collectMetrix);
 +        out.writeObject(jmxReporting);
 +        out.writeObject(creds);
 +        writeObject(out, loadBalancingPlc);
 +        writeObject(out, reconnectionPlc);
 +        writeObject(out, addrTranslator);
 +        writeObject(out, speculativeExecutionPlc);
 +        writeObject(out, authProvider);
 +        writeObject(out, sslOptions);
 +        writeObject(out, poolingOptions);
 +        writeObject(out, sockOptions);
 +        writeObject(out, nettyOptions);
 +    }
 +
 +    /** {@inheritDoc} */
 +    @SuppressWarnings("unchecked")
 +    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
 +        fetchSize = (Integer)in.readObject();
 +        readConsistency = (ConsistencyLevel)in.readObject();
 +        writeConsistency = (ConsistencyLevel)in.readObject();
 +        user = U.readString(in);
 +        pwd = U.readString(in);
 +        port = (Integer)in.readObject();
 +        contactPoints = (List<InetAddress>)in.readObject();
 +        contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
 +        maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
 +        protoVer = (Integer)in.readObject();
 +        compression = U.readString(in);
 +        useSSL = (Boolean)in.readObject();
 +        collectMetrix = (Boolean)in.readObject();
 +        jmxReporting = (Boolean)in.readObject();
 +        creds = (Credentials)in.readObject();
 +        loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
 +        reconnectionPlc = (ReconnectionPolicy)readObject(in);
 +        addrTranslator = (AddressTranslator)readObject(in);
 +        speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
 +        authProvider = (AuthProvider)readObject(in);
 +        sslOptions = (SSLOptions)readObject(in);
 +        poolingOptions = (PoolingOptions)readObject(in);
 +        sockOptions = (SocketOptions)readObject(in);
 +        nettyOptions = (NettyOptions)readObject(in);
 +    }
 +
 +    /**
 +     * Helper method used to serialize class members
 +     * @param out the stream to write the object to
 +     * @param obj the object to be written
 +     * @throws IOException Includes any I/O exceptions that may occur
 +     */
 +    private void writeObject(ObjectOutput out, Object obj) throws IOException {
 +        out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
 +    }
 +
 +    /**
 +     * Helper method used to deserialize class members
 +     * @param in the stream to read data from in order to restore the object
 +     * @throws IOException Includes any I/O exceptions that may occur
 +     * @throws ClassNotFoundException If the class for an object being restored cannot be found
 +     * @return deserialized object
 +     */
 +    private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
 +        Object obj = in.readObject();
 +        return NULL_OBJECT.equals(obj) ? null : obj;
 +    }
 +
 +    /**
 +     * Parses consistency level provided as string.
 +     *
 +     * @param level consistency level string.
 +     *
 +     * @return consistency level.
 +     */
 +    private ConsistencyLevel parseConsistencyLevel(String level) {
 +        if (level == null)
 +            return null;
 +
 +        try {
 +            return ConsistencyLevel.valueOf(level.trim().toUpperCase());
 +        }
 +        catch (Throwable e) {
 +            throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
 +        }
 +    }
 +
 +    /**
 +     * Invalidates session.
 +     */
 +    private synchronized void invalidate() {
 +        ses = null;
 +    }
++
++    /** {@inheritDoc} */
++    @Override public String toString() {
++        return S.toString(DataSource.class, this);
++    }
 +}

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteServices.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/IgniteServicesImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryClassDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 5f0b8a0,4d59d50..8187e8f
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@@ -3076,8 -3233,8 +3076,8 @@@ public abstract class GridCacheAdapter<
      }
  
      /** {@inheritDoc} */
 -    @Override public CacheMetrics clusterMetrics() {
 +    @Override public final CacheMetrics clusterMetrics() {
-         return clusterMetrics(ctx.grid().cluster().forCacheNodes(ctx.name()));
+         return clusterMetrics(ctx.grid().cluster().forDataNodes(ctx.name()));
      }
  
      /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index aeb3ef4,3690f35..d26242d
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@@ -1587,9 -1590,9 +1590,9 @@@ public class GridServiceProcessor exten
                  else
                      topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
  
 -                depExe.submit(new BusyRunnable() {
 +                depExe.execute(new BusyRunnable() {
                      @Override public void run0() {
-                         ClusterNode oldest = CU.oldestAliveCacheServerNode(cache.context().shared(), topVer);
+                         ClusterNode oldest = ctx.discovery().oldestAliveCacheServerNode(topVer);
  
                          if (oldest != null && oldest.isLocal()) {
                              final Collection<GridServiceDeployment> retries = new ConcurrentLinkedQueue<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
index ee5b65c,43017db..8f8d78a
--- a/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cache/affinity/AbstractAffinityFunctionSelfTest.java
@@@ -104,24 -104,9 +104,25 @@@ public abstract class AbstractAffinityF
      }
  
      /**
+      * @param backups Number of backups.
       * @throws Exception If failed.
       */
 +    public void testNullKeyForPartitionCalculation() throws Exception {
 +        AffinityFunction aff = affinityFunction();
 +
 +        try {
 +            aff.partition(null);
 +
 +            fail("Should throw IllegalArgumentException due to NULL affinity key.");
 +        } catch (IllegalArgumentException e) {
 +            e.getMessage().contains("Null key is passed for a partition calculation. " +
 +                "Make sure that an affinity key that is used is initialized properly.");
 +        }
 +    }
 +
 +    /**
 +     * @throws Exception If failed.
 +     */
      protected void checkNodeRemoved(int backups) throws Exception {
          checkNodeRemoved(backups, 1, 1);
      }

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/internal/binary/BinaryMarshallerSelfTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 554bb3d,deec72a..1e73e79
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@@ -39,10 -39,7 +39,11 @@@ import org.apache.ignite.cache.store.jd
  import org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreTest;
  import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreMultithreadedSelfTest;
  import org.apache.ignite.cache.store.jdbc.GridCacheJdbcBlobStoreSelfTest;
 +import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceMultipleConnectionsTest;
 +import org.apache.ignite.internal.managers.communication.IgniteCommunicationBalanceTest;
 +import org.apache.ignite.internal.managers.communication.IgniteIoTestMessagesTest;
 +import org.apache.ignite.internal.managers.communication.IgniteVariousConnectionNumberTest;
+ import org.apache.ignite.cache.store.jdbc.JdbcTypesDefaultTransformerTest;
  import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
  import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
  import org.apache.ignite.internal.processors.cache.CacheEntryProcessorCopySelfTest;

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
index 2d06f3a,350b715..b28619c
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteKernalSelfTestSuite.java
@@@ -66,7 -66,7 +66,8 @@@ import org.apache.ignite.internal.proce
  import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingDefaultMarshallerTest;
  import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingJdkMarshallerTest;
  import org.apache.ignite.internal.processors.service.IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest;
 +import org.apache.ignite.internal.processors.service.IgniteServiceDynamicCachesSelfTest;
+ import org.apache.ignite.internal.processors.service.IgniteServiceProxyTimeoutInitializedTest;
  import org.apache.ignite.internal.processors.service.IgniteServiceReassignmentTest;
  import org.apache.ignite.internal.processors.service.ServicePredicateAccessCacheTest;
  import org.apache.ignite.internal.util.GridStartupWithUndefinedIgniteHomeSelfTest;
@@@ -142,7 -142,7 +143,8 @@@ public class IgniteKernalSelfTestSuite 
          suite.addTestSuite(GridServiceProxyNodeStopSelfTest.class);
          suite.addTestSuite(GridServiceProxyClientReconnectSelfTest.class);
          suite.addTestSuite(IgniteServiceReassignmentTest.class);
+         suite.addTestSuite(IgniteServiceProxyTimeoutInitializedTest.class);
 +        suite.addTestSuite(IgniteServiceDynamicCachesSelfTest.class);
  
          suite.addTestSuite(IgniteServiceDeploymentClassLoadingDefaultMarshallerTest.class);
          suite.addTestSuite(IgniteServiceDeploymentClassLoadingOptimizedMarshallerTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 5df44db,362ddd8..c541185
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@@ -794,40 -774,29 +794,45 @@@ public class IgniteH2Indexing implement
          throws IgniteCheckedException {
          final Connection conn = connectionForSpace(spaceName);
  
-         initLocalQueryContext(conn, enforceJoinOrder, filters);
+         setupConnection(conn, false, enforceJoinOrder);
  
-         Prepared p = null;
+         final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
  
-         try {
-             final PreparedStatement stmt = preparedStatementWithParams(conn, qry, params, true);
++        Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement)stmt);
 +
-             p = GridSqlQueryParser.prepared((JdbcPreparedStatement) stmt);
++        if (!p.isQuery()) {
++            GridH2QueryContext.clearThreadLocal();
 +
-             if (!p.isQuery()) {
-                 GridH2QueryContext.clearThreadLocal();
++            SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
 +
-                 SqlFieldsQuery fldsQry = new SqlFieldsQuery(qry);
++            if (params != null)
++                fldsQry.setArgs(params.toArray());
 +
-                 if (params != null)
-                     fldsQry.setArgs(params.toArray());
++            fldsQry.setEnforceJoinOrder(enforceJoinOrder);
++            fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 +
-                 fldsQry.setEnforceJoinOrder(enforceJoinOrder);
-                 fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
++            return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
++        }
 +
-                 return dmlProc.updateLocalSqlFields(spaceName, stmt, fldsQry, filters, cancel);
-             }
+         List<GridQueryFieldMetadata> meta;
  
-             List<GridQueryFieldMetadata> meta;
+         try {
+             meta = meta(stmt.getMetaData());
+         }
+         catch (SQLException e) {
+             throw new IgniteCheckedException("Cannot prepare query metadata", e);
+         }
  
-             try {
-                 meta = meta(stmt.getMetaData());
-             }
-             catch (SQLException e) {
-                 throw new IgniteCheckedException("Cannot prepare query metadata", e);
-             }
+         final GridH2QueryContext ctx = new GridH2QueryContext(nodeId, nodeId, 0, LOCAL)
+             .filter(filters).distributedJoins(false);
  
-             return new GridQueryFieldsResultAdapter(meta, null) {
-                 @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException{
+         return new GridQueryFieldsResultAdapter(meta, null) {
+             @Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
+                 assert GridH2QueryContext.get() == null;
+ 
+                 GridH2QueryContext.set(ctx);
+ 
+                 try {
                      ResultSet rs = executeSqlQueryWithTimer(spaceName, stmt, conn, qry, params, timeout, cancel);
  
                      return new FieldsIterator(rs);

http://git-wip-us.apache.org/repos/asf/ignite/blob/64247b92/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------