You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/30 07:51:58 UTC

[1/3] ignite git commit: IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.

Repository: ignite
Updated Branches:
  refs/heads/master 00576d8a9 -> e7f353283


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
index 8fdcf4c..d0a787a 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/IgnitePersistentStoreTest.java
@@ -18,19 +18,27 @@
 package org.apache.ignite.tests;
 
 import java.util.Collection;
+import java.util.Date;
 import java.util.Map;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.tests.pojos.Person;
 import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.pojos.Product;
+import org.apache.ignite.tests.pojos.ProductOrder;
 import org.apache.ignite.tests.utils.CacheStoreHelper;
 import org.apache.ignite.tests.utils.CassandraHelper;
 import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -244,12 +252,19 @@ public class IgnitePersistentStoreTest {
 
         Map<Long, Person> personMap1 = TestsHelper.generateLongsPersonsMap();
         Map<PersonId, Person> personMap2 = TestsHelper.generatePersonIdsPersonsMap();
+        Map<Long, Product> productsMap = TestsHelper.generateProductsMap();
+        Map<Long, ProductOrder> ordersMap = TestsHelper.generateOrdersMap();
+
+        Product product = TestsHelper.generateRandomProduct(-1L);
+        ProductOrder order = TestsHelper.generateRandomOrder(-1L);
 
         try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
             IgniteCache<Long, Person> personCache1 = ignite.getOrCreateCache(new CacheConfiguration<Long, Person>("cache1"));
             IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
             IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
             IgniteCache<PersonId, Person> personCache4 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache4"));
+            IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
+            IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
 
             LOGGER.info("Running single operation write tests");
 
@@ -262,6 +277,9 @@ public class IgnitePersistentStoreTest {
             personCache3.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
             personCache4.put(id, TestsHelper.generateRandomPerson(id.getPersonNumber()));
 
+            productCache.put(product.getId(), product);
+            orderCache.put(order.getId(), order);
+
             LOGGER.info("Single operation write tests passed");
 
             LOGGER.info("Running bulk operation write tests");
@@ -269,6 +287,8 @@ public class IgnitePersistentStoreTest {
             personCache2.putAll(personMap2);
             personCache3.putAll(personMap2);
             personCache4.putAll(personMap2);
+            productCache.putAll(productsMap);
+            orderCache.putAll(ordersMap);
             LOGGER.info("Bulk operation write tests passed");
         }
 
@@ -283,6 +303,8 @@ public class IgnitePersistentStoreTest {
             IgniteCache<PersonId, Person> personCache2 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache2"));
             IgniteCache<PersonId, Person> personCache3 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache3"));
             IgniteCache<PersonId, Person> personCache4 = ignite.getOrCreateCache(new CacheConfiguration<PersonId, Person>("cache4"));
+            IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
+            IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
 
             LOGGER.info("Running single operation read tests");
             Person person = personCache1.get(1L);
@@ -303,6 +325,14 @@ public class IgnitePersistentStoreTest {
             if (!person.equals(personMap2.get(id)))
                 throw new RuntimeException("Person value was incorrectly deserialized from Cassandra");
 
+            Product product1 = productCache.get(product.getId());
+            if (!product.equals(product1))
+                throw new RuntimeException("Product value was incorrectly deserialized from Cassandra");
+
+            ProductOrder order1 = orderCache.get(order.getId());
+            if (!order.equals(order1))
+                throw new RuntimeException("Order value was incorrectly deserialized from Cassandra");
+
             LOGGER.info("Single operation read tests passed");
 
             LOGGER.info("Running bulk operation read tests");
@@ -323,6 +353,14 @@ public class IgnitePersistentStoreTest {
             if (!TestsHelper.checkPersonMapsEqual(persons4, personMap2, false))
                 throw new RuntimeException("Person values batch was incorrectly deserialized from Cassandra");
 
+            Map<Long, Product> productsMap1 = productCache.getAll(productsMap.keySet());
+            if (!TestsHelper.checkProductMapsEqual(productsMap, productsMap1))
+                throw new RuntimeException("Product values batch was incorrectly deserialized from Cassandra");
+
+            Map<Long, ProductOrder> ordersMap1 = orderCache.getAll(ordersMap.keySet());
+            if (!TestsHelper.checkOrderMapsEqual(ordersMap, ordersMap1))
+                throw new RuntimeException("Order values batch was incorrectly deserialized from Cassandra");
+
             LOGGER.info("Bulk operation read tests passed");
 
             LOGGER.info("POJO strategy read tests passed");
@@ -341,12 +379,35 @@ public class IgnitePersistentStoreTest {
             personCache4.remove(id);
             personCache4.removeAll(personMap2.keySet());
 
+            productCache.remove(product.getId());
+            productCache.removeAll(productsMap.keySet());
+
+            orderCache.remove(order.getId());
+            orderCache.removeAll(ordersMap.keySet());
+
             LOGGER.info("POJO strategy delete tests passed");
         }
     }
 
     /** */
     @Test
+    public void pojoStrategyTransactionTest() {
+        CassandraHelper.dropTestKeyspaces();
+
+        Ignition.stopAll(true);
+
+        try (Ignite ignite = Ignition.start("org/apache/ignite/tests/persistence/pojo/ignite-config.xml")) {
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.READ_COMMITTED);
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.REPEATABLE_READ);
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.OPTIMISTIC, TransactionIsolation.SERIALIZABLE);
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.READ_COMMITTED);
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ);
+            pojoStrategyTransactionTest(ignite, TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);
+        }
+    }
+
+    /** */
+    @Test
     public void loadCacheTest() {
         Ignition.stopAll(true);
 
@@ -360,6 +421,7 @@ public class IgnitePersistentStoreTest {
 
         Collection<CacheEntryImpl<PersonId, Person>> entries = TestsHelper.generatePersonIdsPersonsEntries();
 
+        //noinspection unchecked
         store.writeAll(entries);
 
         LOGGER.info("Cassandra table filled with test data");
@@ -387,4 +449,207 @@ public class IgnitePersistentStoreTest {
 
         LOGGER.info("loadCache test passed");
     }
+
+    /** */
+    @SuppressWarnings("unchecked")
+    private void pojoStrategyTransactionTest(Ignite ignite, TransactionConcurrency concurrency,
+                                             TransactionIsolation isolation) {
+        LOGGER.info("-----------------------------------------------------------------------------------");
+        LOGGER.info("Running POJO transaction tests using " + concurrency +
+                " concurrency and " + isolation + " isolation level");
+        LOGGER.info("-----------------------------------------------------------------------------------");
+
+        CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+            CassandraHelper.getAdminDataSrc());
+
+        CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+            CassandraHelper.getAdminDataSrc());
+
+        Map<Long, Product> productsMap = TestsHelper.generateProductsMap(5);
+        Map<Long, Product> productsMap1;
+        Map<Long, ProductOrder> ordersMap = TestsHelper.generateOrdersMap(5);
+        Map<Long, ProductOrder> ordersMap1;
+        Product product = TestsHelper.generateRandomProduct(-1L);
+        ProductOrder order = TestsHelper.generateRandomOrder(-1L, -1L, new Date());
+
+        IgniteTransactions txs = ignite.transactions();
+
+        IgniteCache<Long, Product> productCache = ignite.getOrCreateCache(new CacheConfiguration<Long, Product>("product"));
+        IgniteCache<Long, ProductOrder> orderCache = ignite.getOrCreateCache(new CacheConfiguration<Long, ProductOrder>("order"));
+
+        LOGGER.info("Running POJO strategy write tests");
+
+        LOGGER.info("Running single operation write tests");
+
+        Transaction tx = txs.txStart(concurrency, isolation);
+
+        try {
+            productCache.put(product.getId(), product);
+            orderCache.put(order.getId(), order);
+
+            if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) {
+                throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+
+            Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
+            Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
+
+            if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+                throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+
+            tx.commit();
+        }
+        finally {
+            U.closeQuiet(tx);
+        }
+
+        Product product1 = (Product)productStore.load(product.getId());
+        ProductOrder order1 = (ProductOrder)orderStore.load(order.getId());
+
+        if (product1 == null || order1 == null) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (!product.equals(product1) || !order.equals(order1)) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "objects were incorrectly persisted/loaded to/from Cassandra");
+        }
+
+        LOGGER.info("Single operation write tests passed");
+
+        LOGGER.info("Running bulk operation write tests");
+
+        tx = txs.txStart(concurrency, isolation);
+
+        try {
+            productCache.putAll(productsMap);
+            orderCache.putAll(ordersMap);
+
+            productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
+            ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
+
+            if ((productsMap1 != null && !productsMap1.isEmpty()) || (ordersMap1 != null && !ordersMap1.isEmpty())) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+
+            tx.commit();
+        }
+        finally {
+            U.closeQuiet(tx);
+        }
+
+        productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
+        ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
+
+        if (productsMap1 == null || productsMap1.isEmpty() || ordersMap1 == null || ordersMap1.isEmpty()) {
+            throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (productsMap1.size() < productsMap.size() || ordersMap1.size() < ordersMap.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed less objects " +
+                    "into Cassandra than expected");
+        }
+
+        if (productsMap1.size() > productsMap.size() || ordersMap1.size() > ordersMap.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed more objects " +
+                    "into Cassandra than expected");
+        }
+
+        for (Map.Entry<Long, Product> entry : productsMap.entrySet()) {
+            product = productsMap1.get(entry.getKey());
+
+            if (!entry.getValue().equals(product)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "some objects were incorrectly persisted/loaded to/from Cassandra");
+            }
+        }
+
+        for (Map.Entry<Long, ProductOrder> entry : ordersMap.entrySet()) {
+            order = ordersMap1.get(entry.getKey());
+
+            if (!entry.getValue().equals(order)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "some objects were incorrectly persisted/loaded to/from Cassandra");
+            }
+        }
+
+        LOGGER.info("Bulk operation write tests passed");
+
+        LOGGER.info("POJO strategy write tests passed");
+
+        LOGGER.info("Running POJO strategy delete tests");
+
+        LOGGER.info("Running single delete tests");
+
+        tx = txs.txStart(concurrency, isolation);
+
+        try {
+            productCache.remove(-1L);
+            orderCache.remove(-1L);
+
+            if (productStore.load(-1L) == null || orderStore.load(-1L) == null) {
+                throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already deleted from Cassandra");
+            }
+
+            tx.commit();
+        }
+        finally {
+            U.closeQuiet(tx);
+        }
+
+        if (productStore.load(-1L) != null || orderStore.load(-1L) != null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Single delete tests passed");
+
+        LOGGER.info("Running bulk delete tests");
+
+        tx = txs.txStart(concurrency, isolation);
+
+        try {
+            productCache.removeAll(productsMap.keySet());
+            orderCache.removeAll(ordersMap.keySet());
+
+            productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
+            ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
+
+            if (productsMap1.size() != productsMap.size() || ordersMap1.size() != ordersMap.size()) {
+                throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already deleted from Cassandra");
+            }
+
+            tx.commit();
+        }
+        finally {
+            U.closeQuiet(tx);
+        }
+
+        productsMap1 = (Map<Long, Product>)productStore.loadAll(productsMap.keySet());
+        ordersMap1 = (Map<Long, ProductOrder>)orderStore.loadAll(ordersMap.keySet());
+
+        if ((productsMap1 != null && !productsMap1.isEmpty()) || (ordersMap1 != null && !ordersMap1.isEmpty())) {
+            throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Bulk delete tests passed");
+
+        LOGGER.info("POJO strategy delete tests passed");
+
+        LOGGER.info("-----------------------------------------------------------------------------------");
+        LOGGER.info("Passed POJO transaction tests for " + concurrency +
+                " concurrency and " + isolation + " isolation level");
+        LOGGER.info("-----------------------------------------------------------------------------------");
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/Product.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/Product.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/Product.java
new file mode 100644
index 0000000..f8eadf4
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/Product.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.pojos;
+
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+
+/**
+ * Simple POJO to store information about product
+ */
+public class Product {
+    /** */
+    private long id;
+
+    /** */
+    private String type;
+
+    /** */
+    private String title;
+
+    /** */
+    private String description;
+
+    /** */
+    private float price;
+
+    /** */
+    public Product() {
+    }
+
+    /** */
+    public Product(long id, String type, String title, String description, float price) {
+        this.id = id;
+        this.type = type;
+        this.title = title;
+        this.description = description;
+        this.price = price;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return ((Long)id).hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj instanceof Product && id == ((Product) obj).id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return id + ", " + price + ", " + type + ", " + title + ", " + description;
+    }
+
+    /** */
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    /** */
+    @QuerySqlField(index = true)
+    public long getId() {
+        return id;
+    }
+
+    /** */
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    /** */
+    @QuerySqlField
+    public String getType() {
+        return type;
+    }
+
+    /** */
+    public void setTitle(String title) {
+        this.title = title;
+    }
+
+    /** */
+    @QuerySqlField(index = true)
+    public String getTitle() {
+        return title;
+    }
+
+    /** */
+    public void setDescription(String description) {
+        this.description = description;
+    }
+
+    /** */
+    @QuerySqlField
+    public String getDescription() {
+        return description;
+    }
+
+    /** */
+    public void setPrice(float price) {
+        this.price = price;
+    }
+
+    /** */
+    @QuerySqlField
+    public float getPrice() {
+        return price;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
new file mode 100644
index 0000000..4baee83
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.pojos;
+
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+/**
+ * Simple POJO to store information about product order
+ */
+public class ProductOrder {
+    /** */
+    private static final DateFormat FORMAT = new SimpleDateFormat("MM/dd/yyyy/S");
+
+    /** */
+    private static final DateFormat FULL_FORMAT = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss:S");
+
+    /** */
+    private long id;
+
+    /** */
+    private long productId;
+
+    /** */
+    private Date date;
+
+    /** */
+    private int amount;
+
+    /** */
+    private float price;
+
+    /** */
+    public ProductOrder() {
+    }
+
+    /** */
+    public ProductOrder(long id, Product product, Date date, int amount) {
+        this(id, product.getId(), product.getPrice(), date, amount);
+    }
+
+    /** */
+    public ProductOrder(long id, long productId, float productPrice, Date date, int amount) {
+        this.id = id;
+        this.productId = productId;
+        this.date = date;
+        this.amount = amount;
+        this.price = productPrice * amount;
+
+        // if user ordered more than 10 items provide 5% discount
+        if (amount > 10)
+            this.price = this.price * 0.95F;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return ((Long)id).hashCode();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object obj) {
+        return obj instanceof ProductOrder && id == ((ProductOrder) obj).id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return id + ", " + productId + ", " + FULL_FORMAT.format(date) + ", " + getDayMillisecond() + ", " + amount + ", " + price;
+    }
+
+    /** */
+    public void setId(long id) {
+        this.id = id;
+    }
+
+    /** */
+    @QuerySqlField(index = true)
+    public long getId() {
+        return id;
+    }
+
+    /** */
+    public void setProductId(long productId) {
+        this.productId = productId;
+    }
+
+    /** */
+    @QuerySqlField(index = true)
+    public long getProductId() {
+        return productId;
+    }
+
+    /** */
+    public void setDate(Date date) {
+        this.date = date;
+    }
+
+    /** */
+    @QuerySqlField
+    public Date getDate() {
+        return date;
+    }
+
+    /** */
+    public void setAmount(int amount) {
+        this.amount = amount;
+    }
+
+    /** */
+    @QuerySqlField
+    public int getAmount() {
+        return amount;
+    }
+
+    /** */
+    public void setPrice(float price) {
+        this.price = price;
+    }
+
+    /** */
+    @QuerySqlField
+    public float getPrice() {
+        return price;
+    }
+
+    /** */
+    @QuerySqlField
+    public String getDayMillisecond() {
+        return FORMAT.format(date);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
index b5ff5ad..9bcda6e 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
@@ -19,6 +19,7 @@ package org.apache.ignite.tests.utils;
 
 import java.lang.reflect.Field;
 import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreSession;
 import org.apache.ignite.cache.store.cassandra.CassandraCacheStore;
 import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
 import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
@@ -35,12 +36,24 @@ public class CacheStoreHelper {
 
     /** */
     public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn) {
-        return createCacheStore(cacheName, persistenceSettings, conn, LOGGER);
+        return createCacheStore(cacheName, persistenceSettings, conn, null, LOGGER);
     }
 
     /** */
     public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
-        Logger log) {
+                                              CacheStoreSession session) {
+        return createCacheStore(cacheName, persistenceSettings, conn, session, LOGGER);
+    }
+
+    /** */
+    public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
+                                              Logger log) {
+        return createCacheStore(cacheName, persistenceSettings, conn, null, log);
+    }
+
+    /** */
+    public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
+                                              CacheStoreSession session, Logger log) {
         CassandraCacheStore<Integer, Integer> cacheStore =
             new CassandraCacheStore<>(conn, new KeyValuePersistenceSettings(persistenceSettings),
                 Runtime.getRuntime().availableProcessors());
@@ -52,7 +65,7 @@ public class CacheStoreHelper {
             sesField.setAccessible(true);
             logField.setAccessible(true);
 
-            sesField.set(cacheStore, new TestCacheSession(cacheName));
+            sesField.set(cacheStore, session != null ? session : new TestCacheSession(cacheName));
             logField.set(cacheStore, new Log4JLogger(log));
         }
         catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestCacheSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestCacheSession.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestCacheSession.java
index 1cedb7a..3cb47e9 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestCacheSession.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestCacheSession.java
@@ -34,7 +34,7 @@ public class TestCacheSession implements CacheStoreSession {
     private Transaction tx;
 
     /** */
-    private Map<Object, Object> props;
+    private Map<Object, Object> props = U.newHashMap(1);
 
     /** */
     private Object attach;
@@ -45,6 +45,13 @@ public class TestCacheSession implements CacheStoreSession {
     }
 
     /** */
+    public TestCacheSession(String cacheName, Transaction tx, Map<Object, Object> props) {
+        this.cacheName = cacheName;
+        this.tx = tx;
+        this.props = props;
+    }
+
+    /** */
     @SuppressWarnings("UnusedDeclaration")
     public void newSession(@Nullable Transaction tx) {
         this.tx = tx;
@@ -78,9 +85,6 @@ public class TestCacheSession implements CacheStoreSession {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <K, V> Map<K, V> properties() {
-        if (props == null)
-            props = U.newHashMap(1);
-
         return (Map<K, V>)props;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
new file mode 100644
index 0000000..cda6715
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.utils;
+
+import org.apache.ignite.lang.IgniteAsyncSupport;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.UUID;
+
+/**
+ * Dummy transaction for test purposes.
+ */
+public class TestTransaction implements Transaction {
+    /** */
+    private final IgniteUuid xid = IgniteUuid.randomUuid();
+
+    /** {@inheritDoc} */
+    @Nullable
+    @Override public IgniteUuid xid() {
+        return xid;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public UUID nodeId() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long threadId() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long startTime() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public TransactionIsolation isolation() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public TransactionConcurrency concurrency() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean implicit() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInvalidate() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public TransactionState state() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long timeout() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long timeout(long timeout) {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean setRollbackOnly() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isRollbackOnly() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteAsyncSupport withAsync() {
+        throw new UnsupportedOperationException();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rollback() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestsHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestsHelper.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestsHelper.java
index 2e266f6..24d64c9 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestsHelper.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestsHelper.java
@@ -17,19 +17,27 @@
 
 package org.apache.ignite.tests.utils;
 
+
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.tests.load.Generator;
+import org.apache.ignite.tests.pojos.Person;
+import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.pojos.Product;
+import org.apache.ignite.tests.pojos.ProductOrder;
+import org.springframework.core.io.ClassPathResource;
+
 import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
 import java.util.Random;
 import java.util.ResourceBundle;
-import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
-import org.apache.ignite.tests.load.Generator;
-import org.apache.ignite.tests.pojos.Person;
-import org.apache.ignite.tests.pojos.PersonId;
-import org.springframework.core.io.ClassPathResource;
+import java.util.Calendar;
+import java.util.Date;
 
 /**
  * Helper class for all tests
@@ -66,6 +74,21 @@ public class TestsHelper {
     private static final int LOAD_TESTS_REQUESTS_LATENCY = parseTestSettings("load.tests.requests.latency");
 
     /** */
+    private static final int TRANSACTION_PRODUCTS_COUNT = parseTestSettings("transaction.products.count");
+
+    /** */
+    private static final int TRANSACTION_ORDERS_COUNT = parseTestSettings("transaction.orders.count");
+
+    /** */
+    private static final int ORDERS_YEAR;
+
+    /** */
+    private static final int ORDERS_MONTH;
+
+    /** */
+    private static final int ORDERS_DAY;
+
+    /** */
     private static final String LOAD_TESTS_PERSISTENCE_SETTINGS = TESTS_SETTINGS.getString("load.tests.persistence.settings");
 
     /** */
@@ -78,14 +101,30 @@ public class TestsHelper {
     private static final Generator LOAD_TESTS_VALUE_GENERATOR;
 
     /** */
-    private static int parseTestSettings(String name) {
-        return Integer.parseInt(TESTS_SETTINGS.getString(name));
-    }
+    private static final String HOST_PREFIX;
 
     static {
         try {
             LOAD_TESTS_KEY_GENERATOR = (Generator)Class.forName(TESTS_SETTINGS.getString("load.tests.key.generator")).newInstance();
             LOAD_TESTS_VALUE_GENERATOR = (Generator)Class.forName(TESTS_SETTINGS.getString("load.tests.value.generator")).newInstance();
+
+            String[] parts = SystemHelper.HOST_IP.split("\\.");
+
+            String prefix = parts[3];
+            prefix = prefix.length() > 2 ? prefix.substring(prefix.length() - 2) : prefix;
+
+            HOST_PREFIX = prefix;
+
+            Calendar cl = Calendar.getInstance();
+
+            String year = TESTS_SETTINGS.getString("orders.year");
+            ORDERS_YEAR = !year.trim().isEmpty() ? Integer.parseInt(year) : cl.get(Calendar.YEAR);
+
+            String month = TESTS_SETTINGS.getString("orders.month");
+            ORDERS_MONTH = !month.trim().isEmpty() ? Integer.parseInt(month) : cl.get(Calendar.MONTH);
+
+            String day = TESTS_SETTINGS.getString("orders.day");
+            ORDERS_DAY = !day.trim().isEmpty() ? Integer.parseInt(day) : cl.get(Calendar.DAY_OF_MONTH);
         }
         catch (Throwable e) {
             throw new RuntimeException("Failed to initialize TestsHelper", e);
@@ -93,6 +132,11 @@ public class TestsHelper {
     }
 
     /** */
+    private static int parseTestSettings(String name) {
+        return Integer.parseInt(TESTS_SETTINGS.getString(name));
+    }
+
+    /** */
     public static int getLoadTestsThreadsCount() {
         return LOAD_TESTS_THREADS_COUNT;
     }
@@ -275,6 +319,130 @@ public class TestsHelper {
     }
 
     /** */
+    public static List<CacheEntryImpl<Long, Product>> generateProductEntries() {
+        List<CacheEntryImpl<Long, Product>> entries = new LinkedList<>();
+
+        for (long i = 0; i < BULK_OPERATION_SIZE; i++)
+            entries.add(new CacheEntryImpl<>(i, generateRandomProduct(i)));
+
+        return entries;
+    }
+
+    /** */
+    public static Collection<Long> getProductIds(Collection<CacheEntryImpl<Long, Product>> entries) {
+        List<Long> ids = new LinkedList<>();
+
+        for (CacheEntryImpl<Long, Product> entry : entries)
+            ids.add(entry.getKey());
+
+        return ids;
+    }
+
+    /** */
+    public static Map<Long, Product> generateProductsMap() {
+        return generateProductsMap(BULK_OPERATION_SIZE);
+    }
+
+    /** */
+    public static Map<Long, Product> generateProductsMap(int count) {
+        Map<Long, Product> map = new HashMap<>();
+
+        for (long i = 0; i < count; i++)
+            map.put(i, generateRandomProduct(i));
+
+        return map;
+    }
+
+    /** */
+    public static Collection<CacheEntryImpl<Long, ProductOrder>> generateOrderEntries() {
+        Collection<CacheEntryImpl<Long, ProductOrder>> entries = new LinkedList<>();
+
+        for (long i = 0; i < BULK_OPERATION_SIZE; i++) {
+            ProductOrder order = generateRandomOrder(i);
+            entries.add(new CacheEntryImpl<>(order.getId(), order));
+        }
+
+        return entries;
+    }
+
+    /** */
+    public static Map<Long, ProductOrder> generateOrdersMap() {
+        return generateOrdersMap(BULK_OPERATION_SIZE);
+    }
+
+    /** */
+    public static Map<Long, ProductOrder> generateOrdersMap(int count) {
+        Map<Long, ProductOrder> map = new HashMap<>();
+
+        for (long i = 0; i < count; i++) {
+            ProductOrder order = generateRandomOrder(i);
+            map.put(order.getId(), order);
+        }
+
+        return map;
+    }
+
+    /** */
+    public static Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> generateOrdersPerProductEntries(
+            Collection<CacheEntryImpl<Long, Product>> products) {
+        return generateOrdersPerProductEntries(products, TRANSACTION_ORDERS_COUNT);
+    }
+
+    /** */
+    public static Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> generateOrdersPerProductEntries(
+            Collection<CacheEntryImpl<Long, Product>> products, int ordersPerProductCount) {
+        Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> map = new HashMap<>();
+
+        for (CacheEntryImpl<Long, Product> entry : products) {
+            List<CacheEntryImpl<Long, ProductOrder>> orders = new LinkedList<>();
+
+            for (long i = 0; i < ordersPerProductCount; i++) {
+                ProductOrder order = generateRandomOrder(entry.getKey());
+                orders.add(new CacheEntryImpl<>(order.getId(), order));
+            }
+
+            map.put(entry.getKey(), orders);
+        }
+
+        return map;
+    }
+
+    /** */
+    public static Map<Long, Map<Long, ProductOrder>> generateOrdersPerProductMap(Map<Long, Product> products) {
+        return generateOrdersPerProductMap(products, TRANSACTION_ORDERS_COUNT);
+    }
+
+    /** */
+    public static Map<Long, Map<Long, ProductOrder>> generateOrdersPerProductMap(Map<Long, Product> products,
+                                                                                 int ordersPerProductCount) {
+        Map<Long, Map<Long, ProductOrder>> map = new HashMap<>();
+
+        for (Map.Entry<Long, Product> entry : products.entrySet()) {
+            Map<Long, ProductOrder> orders = new HashMap<>();
+
+            for (long i = 0; i < ordersPerProductCount; i++) {
+                ProductOrder order = generateRandomOrder(entry.getKey());
+                orders.put(order.getId(), order);
+            }
+
+            map.put(entry.getKey(), orders);
+        }
+
+        return map;
+    }
+
+    public static Collection<Long> getOrderIds(Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> orders) {
+        Set<Long> ids = new HashSet<>();
+
+        for (Long key : orders.keySet()) {
+            for (CacheEntryImpl<Long, ProductOrder> entry : orders.get(key))
+                ids.add(entry.getKey());
+        }
+
+        return ids;
+    }
+
+    /** */
     public static Person generateRandomPerson(long personNum) {
         int phonesCnt = RANDOM.nextInt(4);
 
@@ -293,6 +461,33 @@ public class TestsHelper {
     }
 
     /** */
+    public static Product generateRandomProduct(long id) {
+        return new Product(id, randomString(2), randomString(6), randomString(20), generateProductPrice(id));
+    }
+
+    /** */
+    public static ProductOrder generateRandomOrder(long productId) {
+        return generateRandomOrder(productId, RANDOM.nextInt(10000));
+    }
+
+    /** */
+    private static ProductOrder generateRandomOrder(long productId, int saltedNumber) {
+        Calendar cl = Calendar.getInstance();
+        cl.set(Calendar.YEAR, ORDERS_YEAR);
+        cl.set(Calendar.MONTH, ORDERS_MONTH);
+        cl.set(Calendar.DAY_OF_MONTH, ORDERS_DAY);
+
+        long id = Long.parseLong(productId + System.currentTimeMillis() + HOST_PREFIX + saltedNumber);
+
+        return generateRandomOrder(id, productId, cl.getTime());
+    }
+
+    /** */
+    public static ProductOrder generateRandomOrder(long id, long productId, Date date) {
+        return new ProductOrder(id, productId, generateProductPrice(productId), date, 1 + RANDOM.nextInt(20));
+    }
+
+    /** */
     public static boolean checkMapsEqual(Map map1, Map map2) {
         if (map1 == null || map2 == null || map1.size() != map2.size())
             return false;
@@ -360,6 +555,66 @@ public class TestsHelper {
     }
 
     /** */
+    public static <K> boolean checkProductCollectionsEqual(Map<K, Product> map, Collection<CacheEntryImpl<K, Product>> col) {
+        if (map == null || col == null || map.size() != col.size())
+            return false;
+
+        for (CacheEntryImpl<K, Product> entry : col)
+            if (!entry.getValue().equals(map.get(entry.getKey())))
+                return false;
+
+        return true;
+    }
+
+    /** */
+    public static <K> boolean checkProductMapsEqual(Map<K, Product> map1, Map<K, Product> map2) {
+        if (map1 == null || map2 == null || map1.size() != map2.size())
+            return false;
+
+        for (K key : map1.keySet()) {
+            Product product1 = map1.get(key);
+            Product product2 = map2.get(key);
+
+            boolean equals = product1 != null && product2 != null && product1.equals(product2);
+
+            if (!equals)
+                return false;
+        }
+
+        return true;
+    }
+
+    /** */
+    public static <K> boolean checkOrderCollectionsEqual(Map<K, ProductOrder> map, Collection<CacheEntryImpl<K, ProductOrder>> col) {
+        if (map == null || col == null || map.size() != col.size())
+            return false;
+
+        for (CacheEntryImpl<K, ProductOrder> entry : col)
+            if (!entry.getValue().equals(map.get(entry.getKey())))
+                return false;
+
+        return true;
+    }
+
+    /** */
+    public static <K> boolean checkOrderMapsEqual(Map<K, ProductOrder> map1, Map<K, ProductOrder> map2) {
+        if (map1 == null || map2 == null || map1.size() != map2.size())
+            return false;
+
+        for (K key : map1.keySet()) {
+            ProductOrder order1 = map1.get(key);
+            ProductOrder order2 = map2.get(key);
+
+            boolean equals = order1 != null && order2 != null && order1.equals(order2);
+
+            if (!equals)
+                return false;
+        }
+
+        return true;
+    }
+
+    /** */
     public static String randomString(int len) {
         StringBuilder builder = new StringBuilder(len);
 
@@ -378,4 +633,28 @@ public class TestsHelper {
 
         return builder.toString();
     }
+
+    /** */
+    private static float generateProductPrice(long productId) {
+        long id = productId < 1000 ?
+                (((productId + 1) * (productId + 1) * 1000) / 2) * 10 :
+                (productId / 20) * (productId / 20);
+
+        id = id == 0 ? 24 : id;
+
+        float price = Long.parseLong(Long.toString(id).replace("0", ""));
+
+        int i = 0;
+
+        while (price > 100) {
+            if (i % 2 != 0)
+                price = price / 2;
+            else
+                price = (float) Math.sqrt(price);
+
+            i++;
+        }
+
+        return ((float)((int)(price * 100))) / 100.0F;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
index cd23a2a..c9b45c8 100644
--- a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/ignite-config.xml
@@ -46,6 +46,16 @@
         <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml" />
     </bean>
 
+    <!-- Persistence settings for 'product' -->
+    <bean id="product_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+        <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/pojo/product.xml" />
+    </bean>
+
+    <!-- Persistence settings for 'order' -->
+    <bean id="order_persistence_settings" class="org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings">
+        <constructor-arg type="org.springframework.core.io.Resource" value="classpath:org/apache/ignite/tests/persistence/pojo/order.xml" />
+    </bean>
+
     <!-- Ignite configuration -->
     <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="cacheConfiguration">
@@ -89,7 +99,7 @@
                     </property>
                 </bean>
 
-                <!-- Configuring persistence for "cache3" cache -->
+                <!-- Configuring persistence for "cache4" cache -->
                 <bean class="org.apache.ignite.configuration.CacheConfiguration">
                     <property name="name" value="cache4"/>
                     <property name="readThrough" value="true"/>
@@ -101,6 +111,35 @@
                         </bean>
                     </property>
                 </bean>
+
+                <!-- Configuring persistence for "product" cache -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="product"/>
+                    <property name="readThrough" value="true"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="atomicityMode" value="TRANSACTIONAL"/>
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+                            <property name="persistenceSettingsBean" value="product_persistence_settings"/>
+                        </bean>
+                    </property>
+                </bean>
+
+                <!-- Configuring persistence for "order" cache -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="name" value="order"/>
+                    <property name="readThrough" value="true"/>
+                    <property name="writeThrough" value="true"/>
+                    <property name="atomicityMode" value="TRANSACTIONAL"/>
+                    <property name="cacheStoreFactory">
+                        <bean class="org.apache.ignite.cache.store.cassandra.CassandraCacheStoreFactory">
+                            <property name="dataSourceBean" value="cassandraAdminDataSource"/>
+                            <property name="persistenceSettingsBean" value="order_persistence_settings"/>
+                        </bean>
+                    </property>
+                </bean>
+
             </list>
         </property>
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/order.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/order.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/order.xml
new file mode 100644
index 0000000..d616364
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/order.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<persistence keyspace="test1" table="order">
+    <keyPersistence class="java.lang.Long" column="id" strategy="PRIMITIVE" />
+    <valuePersistence class="org.apache.ignite.tests.pojos.ProductOrder" strategy="POJO" />
+</persistence>

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/product.xml
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/product.xml b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/product.xml
new file mode 100644
index 0000000..c761e1c
--- /dev/null
+++ b/modules/cassandra/store/src/test/resources/org/apache/ignite/tests/persistence/pojo/product.xml
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<persistence keyspace="test1" table="product">
+    <keyPersistence class="java.lang.Long" column="id" strategy="PRIMITIVE" />
+    <valuePersistence class="org.apache.ignite.tests.pojos.Product" strategy="POJO" />
+</persistence>

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/resources/tests.properties
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/resources/tests.properties b/modules/cassandra/store/src/test/resources/tests.properties
index 2c91e57..b11f2c8 100644
--- a/modules/cassandra/store/src/test/resources/tests.properties
+++ b/modules/cassandra/store/src/test/resources/tests.properties
@@ -16,6 +16,21 @@
 # Number of elements for CacheStore bulk operations: loadAll, writeAll, deleteAll
 bulk.operation.size=100
 
+# Number of product per transaction
+transaction.products.count=2
+
+# Number of orders per transaction
+transaction.orders.count=10
+
+# Year to use for generating new orders
+orders.year=
+
+# Month to use for generating new orders
+orders.month=
+
+# Day of month to use for generating new orders
+orders.day=
+
 # ----- Load tests settings -----
 
 # Ignite cache to be used by load tests


[2/3] ignite git commit: IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.

Posted by ak...@apache.org.
IGNITE-3609 Utilize Cassandra logged batches for transactions. - Fixes #1111.

Signed-off-by: Alexey Kuznetsov <ak...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b8aca64
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b8aca64
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b8aca64

Branch: refs/heads/master
Commit: 3b8aca64b8ebe6ba21f5d02f50cf69ad46dbbc95
Parents: 00576d8
Author: Igor <ir...@gmail.com>
Authored: Fri Sep 30 14:39:30 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Sep 30 14:39:30 2016 +0700

----------------------------------------------------------------------
 .../store/cassandra/CassandraCacheStore.java    | 112 ++++--
 .../store/cassandra/common/CassandraHelper.java |  29 +-
 .../store/cassandra/persistence/PojoField.java  |   9 +-
 .../cassandra/persistence/PojoValueField.java   |   2 -
 .../cassandra/session/CassandraSession.java     |  10 +
 .../cassandra/session/CassandraSessionImpl.java | 113 +++++-
 .../session/transaction/BaseMutation.java       |  68 ++++
 .../session/transaction/DeleteMutation.java     |  57 +++
 .../cassandra/session/transaction/Mutation.java |  64 +++
 .../session/transaction/WriteMutation.java      |  60 +++
 .../session/transaction/package-info.java       |  21 +
 .../tests/CassandraDirectPersistenceTest.java   | 396 ++++++++++++++++---
 .../ignite/tests/CassandraLocalServer.java      |  58 +++
 .../apache/ignite/tests/DDLGeneratorTest.java   |  35 +-
 .../ignite/tests/IgnitePersistentStoreTest.java | 265 +++++++++++++
 .../org/apache/ignite/tests/pojos/Product.java  | 123 ++++++
 .../apache/ignite/tests/pojos/ProductOrder.java | 148 +++++++
 .../ignite/tests/utils/CacheStoreHelper.java    |  19 +-
 .../ignite/tests/utils/TestCacheSession.java    |  12 +-
 .../ignite/tests/utils/TestTransaction.java     | 133 +++++++
 .../apache/ignite/tests/utils/TestsHelper.java  | 299 +++++++++++++-
 .../tests/persistence/pojo/ignite-config.xml    |  41 +-
 .../ignite/tests/persistence/pojo/order.xml     |  21 +
 .../ignite/tests/persistence/pojo/product.xml   |  21 +
 .../store/src/test/resources/tests.properties   |  15 +
 25 files changed, 2005 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
index 6aef0c4..aead39a 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStore.java
@@ -22,8 +22,10 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -41,6 +43,9 @@ import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
 import org.apache.ignite.cache.store.cassandra.session.ExecutionAssistant;
 import org.apache.ignite.cache.store.cassandra.session.GenericBatchExecutionAssistant;
 import org.apache.ignite.cache.store.cassandra.session.LoadCacheCustomQueryWorker;
+import org.apache.ignite.cache.store.cassandra.session.transaction.DeleteMutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+import org.apache.ignite.cache.store.cassandra.session.transaction.WriteMutation;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.logger.NullLogger;
@@ -54,14 +59,16 @@ import org.apache.ignite.resources.LoggerResource;
  * @param <V> Ignite cache value type.
  */
 public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
-    /** Connection attribute property name. */
-    private static final String ATTR_CONN_PROP = "CASSANDRA_STORE_CONNECTION";
+    /** Buffer to store mutations performed withing transaction. */
+    private static final String TRANSACTION_BUFFER = "CASSANDRA_TRANSACTION_BUFFER";
 
     /** Auto-injected store session. */
+    @SuppressWarnings("unused")
     @CacheStoreSessionResource
     private CacheStoreSession storeSes;
 
     /** Auto-injected logger instance. */
+    @SuppressWarnings("unused")
     @LoggerResource
     private IgniteLogger log;
 
@@ -127,12 +134,22 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
 
     /** {@inheritDoc} */
     @Override public void sessionEnd(boolean commit) throws CacheWriterException {
-        if (storeSes == null || storeSes.transaction() == null)
+        if (!storeSes.isWithinTransaction())
             return;
 
-        CassandraSession cassandraSes = (CassandraSession) storeSes.properties().remove(ATTR_CONN_PROP);
+        List<Mutation> mutations = mutations();
+        if (mutations == null || mutations.isEmpty())
+            return;
 
-        U.closeQuiet(cassandraSes);
+        CassandraSession ses = getCassandraSession();
+
+        try {
+            ses.execute(mutations);
+        }
+        finally {
+            mutations.clear();
+            U.closeQuiet(ses);
+        }
     }
 
     /** {@inheritDoc} */
@@ -182,7 +199,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -235,7 +252,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, keys);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -244,6 +261,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (entry == null || entry.getKey() == null)
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            accumulate(new WriteMutation(entry, cassandraTable(), controller));
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -285,7 +307,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -294,6 +316,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (entries == null || entries.isEmpty())
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            for (Cache.Entry<?, ?> entry : entries)
+                accumulate(new WriteMutation(entry, cassandraTable(), controller));
+
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -331,7 +360,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, entries);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -340,6 +369,11 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (key == null)
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            accumulate(new DeleteMutation(key, cassandraTable(), controller));
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -382,7 +416,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             });
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -391,6 +425,13 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
         if (keys == null || keys.isEmpty())
             return;
 
+        if (storeSes.isWithinTransaction()) {
+            for (Object key : keys)
+                accumulate(new DeleteMutation(key, cassandraTable(), controller));
+
+            return;
+        }
+
         CassandraSession ses = getCassandraSession();
 
         try {
@@ -422,7 +463,7 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
             }, keys);
         }
         finally {
-            closeCassandraSession(ses);
+            U.closeQuiet(ses);
         }
     }
 
@@ -433,36 +474,43 @@ public class CassandraCacheStore<K, V> implements CacheStore<K, V> {
      * @return Cassandra session wrapper.
      */
     private CassandraSession getCassandraSession() {
-        if (storeSes == null || storeSes.transaction() == null)
-            return dataSrc.session(log != null ? log : new NullLogger());
-
-        CassandraSession ses = (CassandraSession) storeSes.properties().get(ATTR_CONN_PROP);
-
-        if (ses == null) {
-            ses = dataSrc.session(log != null ? log : new NullLogger());
-            storeSes.properties().put(ATTR_CONN_PROP, ses);
-        }
+        return dataSrc.session(log != null ? log : new NullLogger());
+    }
 
-        return ses;
+    /**
+     * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+     *
+     * @return Table name.
+     */
+    private String cassandraTable() {
+        return controller.getPersistenceSettings().getTable() != null ?
+            controller.getPersistenceSettings().getTable() : storeSes.cacheName().trim().toLowerCase();
     }
 
     /**
-     * Releases Cassandra related resources.
+     * Accumulates mutation in the transaction buffer.
      *
-     * @param ses Cassandra session wrapper.
+     * @param mutation Mutation operation.
      */
-    private void closeCassandraSession(CassandraSession ses) {
-        if (ses != null && (storeSes == null || storeSes.transaction() == null))
-            U.closeQuiet(ses);
+    private void accumulate(Mutation mutation) {
+        //noinspection unchecked
+        List<Mutation> mutations = (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
+
+        if (mutations == null) {
+            mutations = new LinkedList<>();
+            storeSes.properties().put(TRANSACTION_BUFFER, mutations);
+        }
+
+        mutations.add(mutation);
     }
 
     /**
-     * Returns table name to use for all Cassandra based operations (READ/WRITE/DELETE).
+     * Returns all the mutations performed withing transaction.
      *
-     * @return Table name.
+     * @return Mutations
      */
-    private String cassandraTable() {
-        return controller.getPersistenceSettings().getTable() != null ?
-            controller.getPersistenceSettings().getTable() : storeSes.cacheName().toLowerCase();
+    private List<Mutation> mutations() {
+        //noinspection unchecked
+        return (List<Mutation>)storeSes.properties().get(TRANSACTION_BUFFER);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
index 9066112..badd5df 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
@@ -20,9 +20,13 @@ package org.apache.ignite.cache.store.cassandra.common;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.datastax.driver.core.exceptions.ReadTimeoutException;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
 import java.util.regex.Pattern;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
@@ -36,8 +40,15 @@ public class CassandraHelper {
     /** Cassandra error message if trying to create table inside nonexistent keyspace. */
     private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*");
 
+    /** Cassandra error message if trying to create table inside nonexistent keyspace. */
+    private static final Pattern KEYSPACE_EXIST_ERROR3 = Pattern.compile("Error preparing query, got ERROR INVALID: " +
+            "Keyspace [0-9a-zA-Z_]+ does not exist");
+
+    /** Cassandra error message if specified table doesn't exist. */
+    private static final Pattern TABLE_EXIST_ERROR1 = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+
     /** Cassandra error message if specified table doesn't exist. */
-    private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
+    private static final String TABLE_EXIST_ERROR2 = "Error preparing query, got ERROR INVALID: unconfigured table";
 
     /** Cassandra error message if trying to use prepared statement created from another session. */
     private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " +
@@ -85,11 +96,25 @@ public class CassandraHelper {
     public static boolean isTableAbsenceError(Throwable e) {
         while (e != null) {
             if (e instanceof InvalidQueryException &&
-                (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() ||
+                (TABLE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
                     KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
                     KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
                 return true;
 
+            if (e instanceof NoHostAvailableException && ((NoHostAvailableException) e).getErrors() != null) {
+                NoHostAvailableException ex = (NoHostAvailableException)e;
+
+                for (Map.Entry<InetSocketAddress, Throwable> entry : ex.getErrors().entrySet()) {
+                    //noinspection ThrowableResultOfMethodCallIgnored
+                    Throwable error = entry.getValue();
+
+                    if (error instanceof DriverException &&
+                        (error.getMessage().contains(TABLE_EXIST_ERROR2) ||
+                             KEYSPACE_EXIST_ERROR3.matcher(error.getMessage()).matches()))
+                        return true;
+                }
+            }
+
             e = e.getCause();
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
index d708a34..78e75a9 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -85,11 +85,10 @@ public abstract class PojoField implements Serializable {
     public PojoField(PropertyDescriptor desc) {
         this.name = desc.getName();
 
-        QuerySqlField sqlField = desc.getReadMethod() != null ?
-            desc.getReadMethod().getAnnotation(QuerySqlField.class) :
-            desc.getWriteMethod() == null ?
-                null :
-                desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+        QuerySqlField sqlField = desc.getReadMethod() != null &&
+                desc.getReadMethod().getAnnotation(QuerySqlField.class) != null ?
+                desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+                    desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class);
 
         col = sqlField != null && sqlField.name() != null &&
             !sqlField.name().trim().isEmpty() ? sqlField.name() : name.toLowerCase();

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
index c3512c3..3e636c0 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -146,7 +146,5 @@ public class PojoValueField extends PojoField {
      * @param sqlField {@link QuerySqlField} annotation.
      */
     protected void init(QuerySqlField sqlField) {
-        if (sqlField.index())
-            isIndexed = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
index 506982f..b0e50ec 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSession.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.cache.store.cassandra.session;
 
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
+
 import java.io.Closeable;
+import java.util.List;
 
 /**
  * Wrapper around Cassandra driver session, to automatically handle:
@@ -57,4 +60,11 @@ public interface CassandraSession extends Closeable {
      * @param assistant execution assistance to perform the main operation logic.
      */
     public void execute(BatchLoaderAssistant assistant);
+
+    /**
+     * Executes all the mutations performed withing Ignite transaction against Cassandra database.
+     *
+     * @param mutations Mutations.
+     */
+    public void execute(List<Mutation> mutations);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index d2c9e97..4857fa4 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -43,6 +43,7 @@ import org.apache.ignite.cache.store.cassandra.common.CassandraHelper;
 import org.apache.ignite.cache.store.cassandra.common.RandomSleeper;
 import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
 import org.apache.ignite.cache.store.cassandra.session.pool.SessionPool;
+import org.apache.ignite.cache.store.cassandra.session.transaction.Mutation;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 
 /**
@@ -162,7 +163,8 @@ public class CassandraSessionImpl implements CassandraSession {
                         throw new IgniteException(errorMsg, e);
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -320,7 +322,8 @@ public class CassandraSessionImpl implements CassandraSession {
                     handlePreparedStatementClusterError(prepStatEx);
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -402,6 +405,103 @@ public class CassandraSessionImpl implements CassandraSession {
     }
 
     /** {@inheritDoc} */
+    @Override public void execute(List<Mutation> mutations) {
+        if (mutations == null || mutations.isEmpty())
+            return;
+
+        Throwable error = null;
+        String errorMsg = "Failed to apply " + mutations.size() + " mutations performed withing Ignite " +
+                "transaction into Cassandra";
+
+        int attempt = 0;
+        boolean tableExistenceRequired = false;
+        Map<String, PreparedStatement> statements = new HashMap<>();
+        Map<String, KeyValuePersistenceSettings> tableSettings = new HashMap<>();
+        RandomSleeper sleeper = newSleeper();
+
+        incrementSessionRefs();
+
+        try {
+            while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
+                error = null;
+
+                if (attempt != 0) {
+                    log.warning("Trying " + (attempt + 1) + " attempt to apply " + mutations.size() + " mutations " +
+                            "performed withing Ignite transaction into Cassandra");
+                }
+
+                try {
+                    BatchStatement batch = new BatchStatement();
+
+                    // accumulating all the mutations into one Cassandra logged batch
+                    for (Mutation mutation : mutations) {
+                        String key = mutation.getTable() + mutation.getClass().getName();
+                        PreparedStatement st = statements.get(key);
+
+                        if (st == null) {
+                            st = prepareStatement(mutation.getTable(), mutation.getStatement(),
+                                    mutation.getPersistenceSettings(), mutation.tableExistenceRequired());
+
+                            if (st != null)
+                                statements.put(key, st);
+                        }
+
+                        if (st != null)
+                            batch.add(mutation.bindStatement(st));
+
+                        if (attempt == 0) {
+                            if (mutation.tableExistenceRequired()) {
+                                tableExistenceRequired = true;
+
+                                if (!tableSettings.containsKey(mutation.getTable()))
+                                    tableSettings.put(mutation.getTable(), mutation.getPersistenceSettings());
+                            }
+                        }
+                    }
+
+                    // committing logged batch into Cassandra
+                    if (batch.size() > 0)
+                        session().execute(tuneStatementExecutionOptions(batch));
+
+                    return;
+                } catch (Throwable e) {
+                    error = e;
+
+                    if (CassandraHelper.isTableAbsenceError(e)) {
+                        if (tableExistenceRequired) {
+                            for (Map.Entry<String, KeyValuePersistenceSettings> entry : tableSettings.entrySet())
+                                handleTableAbsenceError(entry.getKey(), entry.getValue());
+                        }
+                        else
+                            return;
+                    } else if (CassandraHelper.isHostsAvailabilityError(e)) {
+                        if (handleHostsAvailabilityError(e, attempt, errorMsg))
+                            statements.clear();
+                    } else if (CassandraHelper.isPreparedStatementClusterError(e)) {
+                        handlePreparedStatementClusterError(e);
+                        statements.clear();
+                    } else {
+                        // For an error which we don't know how to handle, we will not try next attempts and terminate.
+                        throw new IgniteException(errorMsg, e);
+                    }
+                }
+
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
+
+                attempt++;
+            }
+        } catch (Throwable e) {
+            error = e;
+        } finally {
+            decrementSessionRefs();
+        }
+
+        log.error(errorMsg, error);
+        throw new IgniteException(errorMsg, error);
+    }
+
+    /** {@inheritDoc} */
     @Override public synchronized void close() throws IOException {
         if (decrementSessionRefs() == 0 && ses != null) {
             SessionPool.put(this, ses);
@@ -517,7 +617,8 @@ public class CassandraSessionImpl implements CassandraSession {
                     error = e;
                 }
 
-                sleeper.sleep();
+                if (!CassandraHelper.isTableAbsenceError(error))
+                    sleeper.sleep();
 
                 attempt++;
             }
@@ -585,7 +686,7 @@ public class CassandraSessionImpl implements CassandraSession {
                 log.info("-----------------------------------------------------------------------");
                 log.info("Creating Cassandra table '" + tableFullName + "'");
                 log.info("-----------------------------------------------------------------------\n\n" +
-                        tableFullName + "\n");
+                        settings.getTableDDLStatement(table) + "\n");
                 log.info("-----------------------------------------------------------------------");
                 session().execute(settings.getTableDDLStatement(table));
                 log.info("Cassandra table '" + tableFullName + "' was successfully created");
@@ -634,10 +735,14 @@ public class CassandraSessionImpl implements CassandraSession {
 
         while (attempt < CQL_EXECUTION_ATTEMPTS_COUNT) {
             try {
+                log.info("-----------------------------------------------------------------------");
                 log.info("Creating indexes for Cassandra table '" + tableFullName + "'");
+                log.info("-----------------------------------------------------------------------");
 
                 for (String statement : indexDDLStatements) {
                     try {
+                        log.info(statement);
+                        log.info("-----------------------------------------------------------------------");
                         session().execute(statement);
                     }
                     catch (AlreadyExistsException ignored) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
new file mode 100644
index 0000000..2625e87
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/BaseMutation.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Base class to inherit from to implement specific mutations operation.
+ */
+public abstract class BaseMutation implements Mutation {
+    /** Cassandra table to use. */
+    private final String table;
+
+    /** Persistence controller to be utilized for mutation. */
+    private final PersistenceController ctrl;
+
+    /**
+     * Creates instance of mutation operation.
+     *
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public BaseMutation(String table, PersistenceController ctrl) {
+        if (table == null || table.trim().isEmpty())
+            throw new IllegalArgumentException("Table name should be specified");
+
+        if (ctrl == null)
+            throw new IllegalArgumentException("Persistence controller should be specified");
+
+        this.table = table;
+        this.ctrl = ctrl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getTable() {
+        return table;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyValuePersistenceSettings getPersistenceSettings() {
+        return ctrl.getPersistenceSettings();
+    }
+
+    /**
+     * Service method to get persistence controller instance
+     *
+     * @return Persistence controller to use for the mutation
+     */
+    protected PersistenceController controller() {
+        return ctrl;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
new file mode 100644
index 0000000..79c0bfe
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/DeleteMutation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which deletes object from Cassandra.
+ */
+public class DeleteMutation extends BaseMutation {
+    /** Ignite cache key of the object which should be deleted. */
+    private final Object key;
+
+    /**
+     * Creates instance of delete mutation operation.
+     *
+     * @param key Ignite cache key of the object which should be deleted.
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public DeleteMutation(Object key, String table, PersistenceController ctrl) {
+        super(table, ctrl);
+        this.key = key;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatement() {
+        return controller().getDeleteStatement(getTable());
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bindStatement(PreparedStatement statement) {
+        return controller().bindKey(statement, key);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
new file mode 100644
index 0000000..cb014f8
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
+
+/**
+ * Provides information about particular mutation operation performed withing transaction.
+ */
+public interface Mutation {
+    /**
+     * Cassandra table to use for an operation.
+     *
+     * @return Table name.
+     */
+    public String getTable();
+
+    /**
+     * Indicates if Cassandra tables existence is required for this operation.
+     *
+     * @return {@code true} true if table existence required.
+     */
+    public boolean tableExistenceRequired();
+
+    /**
+     *  Returns Ignite cache key/value persistence settings.
+     *
+     * @return persistence settings.
+     */
+    public KeyValuePersistenceSettings getPersistenceSettings();
+
+    /**
+     * Returns unbind CLQ statement for to be executed.
+     *
+     * @return Unbind CQL statement.
+     */
+    public String getStatement();
+
+    /**
+     * Binds prepared statement to current Cassandra session.
+     *
+     * @param statement Statement.
+     * @param obj Parameters for statement binding.
+     * @return Bounded statement.
+     */
+    public BoundStatement bindStatement(PreparedStatement statement);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
new file mode 100644
index 0000000..3c74378
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/WriteMutation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.session.transaction;
+
+import javax.cache.Cache;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+
+import org.apache.ignite.cache.store.cassandra.persistence.PersistenceController;
+
+/**
+ * Mutation which writes(inserts) object into Cassandra.
+ */
+public class WriteMutation extends BaseMutation {
+    /** Ignite cache entry to be inserted into Cassandra. */
+    private final Cache.Entry entry;
+
+    /**
+     * Creates instance of delete mutation operation.
+     *
+     * @param entry Ignite cache entry to be inserted into Cassandra.
+     * @param table Cassandra table which should be used for the mutation.
+     * @param ctrl Persistence controller to use.
+     */
+    public WriteMutation(Cache.Entry entry, String table, PersistenceController ctrl) {
+        super(table, ctrl);
+        this.entry = entry;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean tableExistenceRequired() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getStatement() {
+        return controller().getWriteStatement(getTable());
+    }
+
+    /** {@inheritDoc} */
+    @Override public BoundStatement bindStatement(PreparedStatement statement) {
+        return controller().bindKeyValue(statement, entry.getKey(), entry.getValue());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
new file mode 100644
index 0000000..7141845
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains mutations implementation, to store changes made inside Ignite transaction
+ */
+package org.apache.ignite.cache.store.cassandra.session.transaction;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
index 9974898..f9e9649 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraDirectPersistenceTest.java
@@ -18,14 +18,21 @@
 package org.apache.ignite.tests;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.tests.pojos.Person;
 import org.apache.ignite.tests.pojos.PersonId;
+import org.apache.ignite.tests.pojos.Product;
+import org.apache.ignite.tests.pojos.ProductOrder;
 import org.apache.ignite.tests.utils.CacheStoreHelper;
 import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.ignite.tests.utils.TestCacheSession;
+import org.apache.ignite.tests.utils.TestTransaction;
 import org.apache.ignite.tests.utils.TestsHelper;
+import org.apache.ignite.transactions.Transaction;
 import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -113,31 +120,31 @@ public class CassandraDirectPersistenceTest {
 
         LOGGER.info("Running PRIMITIVE strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(longEntries.iterator().next());
         store2.write(strEntries.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(longEntries);
         store2.writeAll(strEntries);
-        LOGGER.info("Bulk operation write tests passed");
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("PRIMITIVE strategy write tests passed");
 
         LOGGER.info("Running PRIMITIVE strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         LOGGER.info("Running real keys read tests");
 
         Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
         if (!longEntries.iterator().next().getValue().equals(longVal))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         String strVal = (String)store2.load(strEntries.iterator().next().getKey());
         if (!strEntries.iterator().next().getValue().equals(strVal))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
         LOGGER.info("Running fake keys read tests");
 
@@ -149,31 +156,31 @@ public class CassandraDirectPersistenceTest {
         if (strVal != null)
             throw new RuntimeException("String value with fake key '-1' was found in Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        LOGGER.info("Single read operation tests passed");
 
-        LOGGER.info("Running bulk operation read tests");
+        LOGGER.info("Running bulk read operation tests");
 
         LOGGER.info("Running real keys read tests");
 
         Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Map strValues = store2.loadAll(TestsHelper.getKeys(strEntries));
         if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
         LOGGER.info("Running fake keys read tests");
 
         longValues = store1.loadAll(fakeLongKeys);
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         strValues = store2.loadAll(fakeStrKeys);
         if (!TestsHelper.checkCollectionsEqual(strValues, strEntries))
-            throw new RuntimeException("String values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("String values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("PRIMITIVE strategy read tests passed");
 
@@ -219,53 +226,53 @@ public class CassandraDirectPersistenceTest {
 
         LOGGER.info("Running BLOB strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(longEntries.iterator().next());
         store2.write(personEntries.iterator().next());
         store3.write(personEntries.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(longEntries);
         store2.writeAll(personEntries);
         store3.writeAll(personEntries);
-        LOGGER.info("Bulk operation write tests passed");
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("BLOB strategy write tests passed");
 
         LOGGER.info("Running BLOB strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         Long longVal = (Long)store1.load(longEntries.iterator().next().getKey());
         if (!longEntries.iterator().next().getValue().equals(longVal))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Person personVal = (Person)store2.load(personEntries.iterator().next().getKey());
         if (!personEntries.iterator().next().getValue().equals(personVal))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         personVal = (Person)store3.load(personEntries.iterator().next().getKey());
         if (!personEntries.iterator().next().getValue().equals(personVal))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        LOGGER.info("Single read operation tests passed");
 
-        LOGGER.info("Running bulk operation read tests");
+        LOGGER.info("Running bulk read operation tests");
 
         Map longValues = store1.loadAll(TestsHelper.getKeys(longEntries));
         if (!TestsHelper.checkCollectionsEqual(longValues, longEntries))
-            throw new RuntimeException("Long values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Long values were incorrectly deserialized from Cassandra");
 
         Map personValues = store2.loadAll(TestsHelper.getKeys(personEntries));
         if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         personValues = store3.loadAll(TestsHelper.getKeys(personEntries));
         if (!TestsHelper.checkPersonCollectionsEqual(personValues, personEntries, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("BLOB strategy read tests passed");
 
@@ -303,69 +310,99 @@ public class CassandraDirectPersistenceTest {
             new ClassPathResource("org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml"),
             CassandraHelper.getAdminDataSrc());
 
+        CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+            CassandraHelper.getAdminDataSrc());
+
+        CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+            CassandraHelper.getAdminDataSrc());
+
         Collection<CacheEntryImpl<Long, Person>> entries1 = TestsHelper.generateLongsPersonsEntries();
         Collection<CacheEntryImpl<PersonId, Person>> entries2 = TestsHelper.generatePersonIdsPersonsEntries();
         Collection<CacheEntryImpl<PersonId, Person>> entries3 = TestsHelper.generatePersonIdsPersonsEntries();
+        Collection<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+        Collection<CacheEntryImpl<Long, ProductOrder>> orderEntries = TestsHelper.generateOrderEntries();
 
         LOGGER.info("Running POJO strategy write tests");
 
-        LOGGER.info("Running single operation write tests");
+        LOGGER.info("Running single write operation tests");
         store1.write(entries1.iterator().next());
         store2.write(entries2.iterator().next());
         store3.write(entries3.iterator().next());
         store4.write(entries3.iterator().next());
-        LOGGER.info("Single operation write tests passed");
+        productStore.write(productEntries.iterator().next());
+        orderStore.write(orderEntries.iterator().next());
+        LOGGER.info("Single write operation tests passed");
 
-        LOGGER.info("Running bulk operation write tests");
+        LOGGER.info("Running bulk write operation tests");
         store1.writeAll(entries1);
         store2.writeAll(entries2);
         store3.writeAll(entries3);
         store4.writeAll(entries3);
-        LOGGER.info("Bulk operation write tests passed");
+        productStore.writeAll(productEntries);
+        orderStore.writeAll(orderEntries);
+        LOGGER.info("Bulk write operation tests passed");
 
         LOGGER.info("POJO strategy write tests passed");
 
         LOGGER.info("Running POJO strategy read tests");
 
-        LOGGER.info("Running single operation read tests");
+        LOGGER.info("Running single read operation tests");
 
         Person person = (Person)store1.load(entries1.iterator().next().getKey());
         if (!entries1.iterator().next().getValue().equalsPrimitiveFields(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store2.load(entries2.iterator().next().getKey());
         if (!entries2.iterator().next().getValue().equalsPrimitiveFields(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store3.load(entries3.iterator().next().getKey());
         if (!entries3.iterator().next().getValue().equals(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         person = (Person)store4.load(entries3.iterator().next().getKey());
         if (!entries3.iterator().next().getValue().equals(person))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Single operation read tests passed");
+        Product product = (Product)productStore.load(productEntries.iterator().next().getKey());
+        if (!productEntries.iterator().next().getValue().equals(product))
+            throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Running bulk operation read tests");
+        ProductOrder order = (ProductOrder)orderStore.load(orderEntries.iterator().next().getKey());
+        if (!orderEntries.iterator().next().getValue().equals(order))
+            throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
+
+        LOGGER.info("Single read operation tests passed");
+
+        LOGGER.info("Running bulk read operation tests");
 
         Map persons = store1.loadAll(TestsHelper.getKeys(entries1));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries1, true))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store2.loadAll(TestsHelper.getKeys(entries2));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries2, true))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store3.loadAll(TestsHelper.getKeys(entries3));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
 
         persons = store4.loadAll(TestsHelper.getKeys(entries3));
         if (!TestsHelper.checkPersonCollectionsEqual(persons, entries3, false))
-            throw new RuntimeException("Person values was incorrectly deserialized from Cassandra");
+            throw new RuntimeException("Person values were incorrectly deserialized from Cassandra");
+
+        Map products = productStore.loadAll(TestsHelper.getKeys(productEntries));
+        if (!TestsHelper.checkProductCollectionsEqual(products, productEntries))
+            throw new RuntimeException("Product values were incorrectly deserialized from Cassandra");
+
+        Map orders = orderStore.loadAll(TestsHelper.getKeys(orderEntries));
+        if (!TestsHelper.checkOrderCollectionsEqual(orders, orderEntries))
+            throw new RuntimeException("Order values were incorrectly deserialized from Cassandra");
 
-        LOGGER.info("Bulk operation read tests passed");
+        LOGGER.info("Bulk read operation tests passed");
 
         LOGGER.info("POJO strategy read tests passed");
 
@@ -383,6 +420,277 @@ public class CassandraDirectPersistenceTest {
         store4.delete(entries3.iterator().next().getKey());
         store4.deleteAll(TestsHelper.getKeys(entries3));
 
+        productStore.delete(productEntries.iterator().next().getKey());
+        productStore.deleteAll(TestsHelper.getKeys(productEntries));
+
+        orderStore.delete(orderEntries.iterator().next().getKey());
+        orderStore.deleteAll(TestsHelper.getKeys(orderEntries));
+
         LOGGER.info("POJO strategy delete tests passed");
     }
+
+    /** */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void pojoStrategyTransactionTest() {
+        Map<Object, Object> sessionProps = U.newHashMap(1);
+        Transaction sessionTx = new TestTransaction();
+
+        CacheStore productStore = CacheStoreHelper.createCacheStore("product",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/product.xml"),
+            CassandraHelper.getAdminDataSrc(), new TestCacheSession("product", sessionTx, sessionProps));
+
+        CacheStore orderStore = CacheStoreHelper.createCacheStore("order",
+            new ClassPathResource("org/apache/ignite/tests/persistence/pojo/order.xml"),
+            CassandraHelper.getAdminDataSrc(), new TestCacheSession("order", sessionTx, sessionProps));
+
+        List<CacheEntryImpl<Long, Product>> productEntries = TestsHelper.generateProductEntries();
+        Map<Long, List<CacheEntryImpl<Long, ProductOrder>>> ordersPerProduct =
+                TestsHelper.generateOrdersPerProductEntries(productEntries, 2);
+
+        Collection<Long> productIds =  TestsHelper.getProductIds(productEntries);
+        Collection<Long> orderIds =  TestsHelper.getOrderIds(ordersPerProduct);
+
+        LOGGER.info("Running POJO strategy transaction write tests");
+
+        LOGGER.info("Running single write operation tests");
+
+        CassandraHelper.dropTestKeyspaces();
+
+        Product product = productEntries.iterator().next().getValue();
+        ProductOrder order = ordersPerProduct.get(product.getId()).iterator().next().getValue();
+
+        productStore.write(productEntries.iterator().next());
+        orderStore.write(ordersPerProduct.get(product.getId()).iterator().next());
+
+        if (productStore.load(product.getId()) != null || orderStore.load(order.getId()) != null) {
+            throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        Map<Long, Product> products = (Map<Long, Product>)productStore.loadAll(productIds);
+        Map<Long, ProductOrder> orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Single write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+
+        Product product1 = (Product)productStore.load(product.getId());
+        ProductOrder order1 = (ProductOrder)orderStore.load(order.getId());
+
+        if (product1 == null || order1 == null) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (!product.equals(product1) || !order.equals(order1)) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "objects were incorrectly persisted/loaded to/from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (products.size() > 1 || orders.size() > 1) {
+            throw new RuntimeException("Single write operation test failed. There were committed more objects " +
+                    "into Cassandra than expected");
+        }
+
+        product1 = products.entrySet().iterator().next().getValue();
+        order1 = orders.entrySet().iterator().next().getValue();
+
+        if (!product.equals(product1) || !order.equals(order1)) {
+            throw new RuntimeException("Single write operation test failed. Transaction was committed, but " +
+                    "objects were incorrectly persisted/loaded to/from Cassandra");
+        }
+
+        LOGGER.info("Single write operation tests passed");
+
+        LOGGER.info("Running bulk write operation tests");
+
+        CassandraHelper.dropTestKeyspaces();
+        sessionProps.clear();
+
+        productStore.writeAll(productEntries);
+
+        for (Long productId : ordersPerProduct.keySet())
+            orderStore.writeAll(ordersPerProduct.get(productId));
+
+        for (Long productId : productIds) {
+            if (productStore.load(productId) != null) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+        }
+
+        for (Long orderId : orderIds) {
+            if (orderStore.load(orderId) != null) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                        "objects were already persisted into Cassandra");
+            }
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Bulk write operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already persisted into Cassandra");
+        }
+
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+
+        for (CacheEntryImpl<Long, Product> entry : productEntries) {
+            product = (Product)productStore.load(entry.getKey());
+
+            if (!entry.getValue().equals(product)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "not all objects were persisted into Cassandra");
+            }
+        }
+
+        for (Long productId : ordersPerProduct.keySet()) {
+            for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+                order = (ProductOrder)orderStore.load(entry.getKey());
+
+                if (!entry.getValue().equals(order)) {
+                    throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                            "not all objects were persisted into Cassandra");
+                }
+            }
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                    "no objects were persisted into Cassandra");
+        }
+
+        if (products.size() < productIds.size() || orders.size() < orderIds.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed less objects " +
+                    "into Cassandra than expected");
+        }
+
+        if (products.size() > productIds.size() || orders.size() > orderIds.size()) {
+            throw new RuntimeException("Bulk write operation test failed. There were committed more objects " +
+                    "into Cassandra than expected");
+        }
+
+        for (CacheEntryImpl<Long, Product> entry : productEntries) {
+            product = products.get(entry.getKey());
+
+            if (!entry.getValue().equals(product)) {
+                throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                        "some objects were incorrectly persisted/loaded to/from Cassandra");
+            }
+        }
+
+        for (Long productId : ordersPerProduct.keySet()) {
+            for (CacheEntryImpl<Long, ProductOrder> entry : ordersPerProduct.get(productId)) {
+                order = orders.get(entry.getKey());
+
+                if (!entry.getValue().equals(order)) {
+                    throw new RuntimeException("Bulk write operation test failed. Transaction was committed, but " +
+                            "some objects were incorrectly persisted/loaded to/from Cassandra");
+                }
+            }
+        }
+
+        LOGGER.info("Bulk write operation tests passed");
+
+        LOGGER.info("POJO strategy transaction write tests passed");
+
+        LOGGER.info("Running POJO strategy transaction delete tests");
+
+        LOGGER.info("Running single delete tests");
+
+        sessionProps.clear();
+
+        Product deletedProduct = productEntries.remove(0).getValue();
+        ProductOrder deletedOrder = ordersPerProduct.get(deletedProduct.getId()).remove(0).getValue();
+
+        productStore.delete(deletedProduct.getId());
+        orderStore.delete(deletedOrder.getId());
+
+        if (productStore.load(deletedProduct.getId()) == null || orderStore.load(deletedOrder.getId()) == null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products.size() != productIds.size() || orders.size() != orderIds.size()) {
+            throw new RuntimeException("Single delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+
+        if (productStore.load(deletedProduct.getId()) != null || orderStore.load(deletedOrder.getId()) != null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products.get(deletedProduct.getId()) != null || orders.get(deletedOrder.getId()) != null) {
+            throw new RuntimeException("Single delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Single delete tests passed");
+
+        LOGGER.info("Running bulk delete tests");
+
+        sessionProps.clear();
+
+        productStore.deleteAll(productIds);
+        orderStore.deleteAll(orderIds);
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if (products == null || products.isEmpty() || orders == null || orders.isEmpty()) {
+            throw new RuntimeException("Bulk delete operation test failed. Transaction wasn't committed yet, but " +
+                    "objects were already deleted from Cassandra");
+        }
+
+        //noinspection deprecation
+        orderStore.sessionEnd(true);
+        //noinspection deprecation
+        productStore.sessionEnd(true);
+
+        products = (Map<Long, Product>)productStore.loadAll(productIds);
+        orders = (Map<Long, ProductOrder>)orderStore.loadAll(orderIds);
+
+        if ((products != null && !products.isEmpty()) || (orders != null && !orders.isEmpty())) {
+            throw new RuntimeException("Bulk delete operation test failed. Transaction was committed, but " +
+                    "objects were not deleted from Cassandra");
+        }
+
+        LOGGER.info("Bulk delete tests passed");
+
+        LOGGER.info("POJO strategy transaction delete tests passed");
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
new file mode 100644
index 0000000..fc54e5b
--- /dev/null
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import org.apache.ignite.tests.utils.CassandraHelper;
+import org.apache.log4j.Logger;
+
+/**
+ * Simple helper class to run Cassandra on localhost
+ */
+public class CassandraLocalServer {
+    /** */
+    private static final Logger LOGGER = Logger.getLogger(CassandraLocalServer.class.getName());
+
+    /** */
+    public static void main(String[] args) {
+        try {
+            CassandraHelper.startEmbeddedCassandra(LOGGER);
+        }
+        catch (Throwable e) {
+            throw new RuntimeException("Failed to start embedded Cassandra instance", e);
+        }
+
+        LOGGER.info("Testing admin connection to Cassandra");
+        CassandraHelper.testAdminConnection();
+
+        LOGGER.info("Testing regular connection to Cassandra");
+        CassandraHelper.testRegularConnection();
+
+        LOGGER.info("Dropping all artifacts from previous tests execution session");
+        CassandraHelper.dropTestKeyspaces();
+
+        while (true) {
+            try {
+                System.out.println("Cassandra server running");
+                Thread.sleep(10000);
+            }
+            catch (Throwable e) {
+                throw new RuntimeException("Cassandra server terminated", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/3b8aca64/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
index 43b6d3c..6465580 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
@@ -25,33 +25,30 @@ import org.junit.Test;
  * DDLGenerator test.
  */
 public class DDLGeneratorTest {
-    private static final String URL1 = "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml";
-    private static final String URL2 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml";
-    private static final String URL3 = "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml";
+    private static final String[] RESOURCES = new String[] {
+        "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml",
+        "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml",
+        "org/apache/ignite/tests/persistence/pojo/persistence-settings-4.xml",
+        "org/apache/ignite/tests/persistence/pojo/product.xml",
+        "org/apache/ignite/tests/persistence/pojo/order.xml"
+    };
 
     @Test
     @SuppressWarnings("unchecked")
     /** */
     public void generatorTest() {
-        ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
-
-        URL url1 = clsLdr.getResource(URL1);
-        if (url1 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL1);
+        String[] files = new String[RESOURCES.length];
 
-        URL url2 = clsLdr.getResource(URL2);
-        if (url2 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL2);
+        ClassLoader clsLdr = DDLGeneratorTest.class.getClassLoader();
 
-        URL url3 = clsLdr.getResource(URL3);
-        if (url3 == null)
-            throw new IllegalStateException("Failed to find resource: " + URL3);
+        for (int i = 0; i < RESOURCES.length; i++) {
+            URL url = clsLdr.getResource(RESOURCES[i]);
+            if (url == null)
+                throw new IllegalStateException("Failed to find resource: " + RESOURCES[i]);
 
-        String file1 = url1.getFile();
-        String file2 = url2.getFile();
-        String file3 = url3.getFile();
+            files[i] = url.getFile();
+        }
 
-        DDLGenerator.main(new String[]{file1, file2, file3});
+        DDLGenerator.main(files);
     }
-
 }


[3/3] ignite git commit: IGNITE-3609 Review.

Posted by ak...@apache.org.
IGNITE-3609 Review.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e7f35328
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e7f35328
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e7f35328

Branch: refs/heads/master
Commit: e7f353283af2792a2ff0fe8c744b5d2308ece366
Parents: 3b8aca6
Author: Alexey Kuznetsov <ak...@apache.org>
Authored: Fri Sep 30 14:51:54 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Fri Sep 30 14:51:54 2016 +0700

----------------------------------------------------------------------
 .../store/cassandra/common/RandomSleeper.java    |  2 +-
 .../persistence/PersistenceController.java       |  8 +++++++-
 .../store/cassandra/persistence/PojoField.java   | 10 ++++++----
 .../cassandra/persistence/PojoKeyField.java      | 10 +++-------
 .../cassandra/persistence/PojoValueField.java    | 19 ++++++-------------
 .../cassandra/session/CassandraSessionImpl.java  |  4 ++--
 .../cassandra/session/pool/SessionPool.java      |  2 +-
 .../cassandra/session/transaction/Mutation.java  |  1 -
 .../ignite/tests/CassandraLocalServer.java       |  1 +
 .../apache/ignite/tests/DDLGeneratorTest.java    |  5 ++++-
 .../apache/ignite/tests/load/IntGenerator.java   |  2 +-
 .../apache/ignite/tests/load/LoadTestDriver.java | 14 +++++++-------
 .../org/apache/ignite/tests/load/Worker.java     | 18 +++++++++---------
 .../apache/ignite/tests/pojos/ProductOrder.java  |  2 +-
 .../ignite/tests/utils/CacheStoreHelper.java     |  6 +++---
 .../ignite/tests/utils/TestTransaction.java      |  3 +--
 16 files changed, 53 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
index 6745a16..f2e57a9 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
@@ -43,7 +43,7 @@ public class RandomSleeper {
     private Random random = new Random(System.currentTimeMillis());
 
     /** */
-    private int summary = 0;
+    private int summary;
 
     /**
      * Creates sleeper instance.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
index 122f0c8..e287a4e 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
@@ -166,7 +166,7 @@ public class PersistenceController {
     }
 
     /**
-     * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}.
+     * Binds Ignite cache key object to {@link PreparedStatement}.
      *
      * @param statement statement to which key object should be bind.
      * @param key key object.
@@ -347,6 +347,12 @@ public class PersistenceController {
         return new String[] {hdrWithKeyFields + statement.toString(), hdr + statement.toString()};
     }
 
+    /**
+     * @param table Table.
+     * @param template Template.
+     * @param statements Statements.
+     * @return Statement.
+     */
     private String getStatement(final String table, final String template, final Map<String, String> statements) {
         //noinspection SynchronizationOnLocalVariableOrMethodParameter
         synchronized (statements) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
index 78e75a9..99b96d5 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -21,6 +21,7 @@ import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Row;
 import java.beans.PropertyDescriptor;
 import java.io.Serializable;
+import java.lang.reflect.Method;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
@@ -85,10 +86,11 @@ public abstract class PojoField implements Serializable {
     public PojoField(PropertyDescriptor desc) {
         this.name = desc.getName();
 
-        QuerySqlField sqlField = desc.getReadMethod() != null &&
-                desc.getReadMethod().getAnnotation(QuerySqlField.class) != null ?
-                desc.getReadMethod().getAnnotation(QuerySqlField.class) :
-                    desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+        Method rdMthd = desc.getReadMethod();
+
+        QuerySqlField sqlField = rdMthd != null && rdMthd.getAnnotation(QuerySqlField.class) != null
+            ? rdMthd.getAnnotation(QuerySqlField.class)
+            : desc.getWriteMethod() == null ? null : desc.getWriteMethod().getAnnotation(QuerySqlField.class);
 
         col = sqlField != null && sqlField.name() != null &&
             !sqlField.name().trim().isEmpty() ? sqlField.name() : name.toLowerCase();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
index 4e86d74..6f42db2 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
@@ -40,7 +40,7 @@ public class PojoKeyField extends PojoField {
     private static final String SORT_ATTR = "sort";
 
     /** Sort order. */
-    private SortOrder sortOrder = null;
+    private SortOrder sortOrder;
 
     /**
      * Constructs Ignite cache key POJO object descriptor.
@@ -79,12 +79,8 @@ public class PojoKeyField extends PojoField {
         return sortOrder;
     }
 
-    /**
-     * Initializes descriptor from {@link QuerySqlField} annotation.
-     *
-     * @param sqlField {@link QuerySqlField} annotation.
-     */
-    protected void init(QuerySqlField sqlField) {
+    /** {@inheritDoc} */
+    @Override protected void init(QuerySqlField sqlField) {
         if (sqlField.descending())
             sortOrder = SortOrder.DESC;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
index 3e636c0..fcdd408 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -87,16 +87,12 @@ public class PojoValueField extends PojoField {
         super(desc);
     }
 
-    /**
-     * Returns DDL for Cassandra columns corresponding to POJO field.
-     *
-     * @return columns DDL.
-     */
-    public String getColumnDDL() {
+    /** {@inheritDoc} */
+    @Override public String getColumnDDL() {
         String colDDL = super.getColumnDDL();
 
         if (isStatic != null && isStatic)
-            colDDL = colDDL + " static";
+            colDDL += " static";
 
         return colDDL;
     }
@@ -140,11 +136,8 @@ public class PojoValueField extends PojoField {
         return builder.append(";").toString();
     }
 
-    /**
-     * Initializes descriptor from {@link QuerySqlField} annotation.
-     *
-     * @param sqlField {@link QuerySqlField} annotation.
-     */
-    protected void init(QuerySqlField sqlField) {
+    /** {@inheritDoc} */
+    @Override protected void init(QuerySqlField sqlField) {
+        // No-op.
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
index 4857fa4..ac11686 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/CassandraSessionImpl.java
@@ -69,7 +69,7 @@ public class CassandraSessionImpl implements CassandraSession {
     private volatile Session ses;
 
     /** Number of references to Cassandra driver session (for multithreaded environment). */
-    private volatile int refCnt = 0;
+    private volatile int refCnt;
 
     /** Storage for the session prepared statements */
     private static final Map<String, PreparedStatement> sesStatements = new HashMap<>();
@@ -748,7 +748,7 @@ public class CassandraSessionImpl implements CassandraSession {
                     catch (AlreadyExistsException ignored) {
                     }
                     catch (Throwable e) {
-                        if (!(e instanceof InvalidQueryException) || !e.getMessage().equals("Index already exists"))
+                        if (!(e instanceof InvalidQueryException) || !"Index already exists".equals(e.getMessage()))
                             throw new IgniteException(errorMsg, e);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
index fc4a907..95938bd 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/pool/SessionPool.java
@@ -146,7 +146,7 @@ public class SessionPool {
 
         synchronized (sessions) {
             try {
-                if (sessions.size() == 0)
+                if (sessions.isEmpty())
                     return;
 
                 wrappers = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
index cb014f8..f3fb354 100644
--- a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/session/transaction/Mutation.java
@@ -57,7 +57,6 @@ public interface Mutation {
      * Binds prepared statement to current Cassandra session.
      *
      * @param statement Statement.
-     * @param obj Parameters for statement binding.
      * @return Bounded statement.
      */
     public BoundStatement bindStatement(PreparedStatement statement);

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
index fc54e5b..eea4e9e 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/CassandraLocalServer.java
@@ -48,6 +48,7 @@ public class CassandraLocalServer {
         while (true) {
             try {
                 System.out.println("Cassandra server running");
+
                 Thread.sleep(10000);
             }
             catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
index 6465580..e982e16 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/DDLGeneratorTest.java
@@ -25,6 +25,7 @@ import org.junit.Test;
  * DDLGenerator test.
  */
 public class DDLGeneratorTest {
+    /** */
     private static final String[] RESOURCES = new String[] {
         "org/apache/ignite/tests/persistence/primitive/persistence-settings-1.xml",
         "org/apache/ignite/tests/persistence/pojo/persistence-settings-3.xml",
@@ -33,9 +34,11 @@ public class DDLGeneratorTest {
         "org/apache/ignite/tests/persistence/pojo/order.xml"
     };
 
+    /**
+     * Test DDL generator.
+     */
     @Test
     @SuppressWarnings("unchecked")
-    /** */
     public void generatorTest() {
         String[] files = new String[RESOURCES.length];
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
index a31abee..21490f6 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/IntGenerator.java
@@ -26,7 +26,7 @@ public class IntGenerator implements Generator {
         long val = i / 10000;
 
         while (val > Integer.MAX_VALUE)
-            val = val / 2;
+            val /= 2;
 
         return (int)val;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
index 296839d..2582007 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/LoadTestDriver.java
@@ -74,7 +74,7 @@ public abstract class LoadTestDriver {
         }
 
         // calculates host unique prefix based on its subnet IP address
-        long hostUniqePrefix = getHostUniquePrefix();
+        long hostUniquePrefix = getHostUniquePrefix();
 
         logger().info("Load tests driver setup successfully completed");
 
@@ -87,8 +87,8 @@ public abstract class LoadTestDriver {
 
             for (int i = 0; i < TestsHelper.getLoadTestsThreadsCount(); i++) {
                 Worker worker = createWorker(clazz, cfg,
-                    hostUniqePrefix + startPosition,
-                    hostUniqePrefix + startPosition + 100000000);
+                    hostUniquePrefix + startPosition,
+                    hostUniquePrefix + startPosition + 100000000);
                 workers.add(worker);
                 worker.setName(testName + "-worker-" + i);
                 worker.start();
@@ -224,14 +224,14 @@ public abstract class LoadTestDriver {
         long part4 = Long.parseLong(parts[3]);
 
         if (part3 < 10)
-            part3 = part3 * 100;
+            part3 *= 100;
         else if (part4 < 100)
-            part3 = part3 * 10;
+            part3 *= 10;
 
         if (part4 < 10)
-            part4 = part4 * 100;
+            part4 *= 100;
         else if (part4 < 100)
-            part4 = part4 * 10;
+            part4 *= 10;
 
         return (part4 * 100000000000000L) + (part3 * 100000000000L) + Thread.currentThread().getId();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java
index f4bffc7..5f3c393 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/load/Worker.java
@@ -46,31 +46,31 @@ public abstract class Worker extends Thread {
     boolean warmup = TestsHelper.getLoadTestsWarmupPeriod() != 0;
 
     /** */
-    private volatile long warmupStartTime = 0;
+    private volatile long warmupStartTime;
 
     /** */
-    private volatile long warmupFinishTime = 0;
+    private volatile long warmupFinishTime;
 
     /** */
-    private volatile long startTime = 0;
+    private volatile long startTime;
 
     /** */
-    private volatile long finishTime = 0;
+    private volatile long finishTime;
 
     /** */
-    private volatile long warmupMsgProcessed = 0;
+    private volatile long warmupMsgProcessed;
 
     /** */
-    private volatile long warmupSleepCnt = 0;
+    private volatile long warmupSleepCnt;
 
     /** */
-    private volatile long msgProcessed = 0;
+    private volatile long msgProcessed;
 
     /** */
-    private volatile long msgFailed = 0;
+    private volatile long msgFailed;
 
     /** */
-    private volatile long sleepCnt = 0;
+    private volatile long sleepCnt;
 
     /** */
     private Throwable executionError;

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
index 4baee83..bafc8f3 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/pojos/ProductOrder.java
@@ -67,7 +67,7 @@ public class ProductOrder {
 
         // if user ordered more than 10 items provide 5% discount
         if (amount > 10)
-            this.price = this.price * 0.95F;
+            price *= 0.95F;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
index 9bcda6e..ddfa111 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/CacheStoreHelper.java
@@ -41,19 +41,19 @@ public class CacheStoreHelper {
 
     /** */
     public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
-                                              CacheStoreSession session) {
+        CacheStoreSession session) {
         return createCacheStore(cacheName, persistenceSettings, conn, session, LOGGER);
     }
 
     /** */
     public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
-                                              Logger log) {
+        Logger log) {
         return createCacheStore(cacheName, persistenceSettings, conn, null, log);
     }
 
     /** */
     public static CacheStore createCacheStore(String cacheName, Resource persistenceSettings, DataSource conn,
-                                              CacheStoreSession session, Logger log) {
+        CacheStoreSession session, Logger log) {
         CassandraCacheStore<Integer, Integer> cacheStore =
             new CassandraCacheStore<>(conn, new KeyValuePersistenceSettings(persistenceSettings),
                 Runtime.getRuntime().availableProcessors());

http://git-wip-us.apache.org/repos/asf/ignite/blob/e7f35328/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
index cda6715..5f3ec69 100644
--- a/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
+++ b/modules/cassandra/store/src/test/java/org/apache/ignite/tests/utils/TestTransaction.java
@@ -36,8 +36,7 @@ public class TestTransaction implements Transaction {
     private final IgniteUuid xid = IgniteUuid.randomUuid();
 
     /** {@inheritDoc} */
-    @Nullable
-    @Override public IgniteUuid xid() {
+    @Nullable @Override public IgniteUuid xid() {
         return xid;
     }