You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/14 01:37:54 UTC
[08/20] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra
serializers. - Fixes #956.
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
new file mode 100644
index 0000000..393dbe4
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.affinity.AffinityKeyMapped;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache key
+ */
+public class KeyPersistenceSettings extends PersistenceSettings {
+ /** Partition key XML tag. */
+ private static final String PARTITION_KEY_ELEMENT = "partitionKey";
+
+ /** Cluster key XML tag. */
+ private static final String CLUSTER_KEY_ELEMENT = "clusterKey";
+
+ /** POJO field XML tag. */
+ private static final String FIELD_ELEMENT = "field";
+
+ /** POJO fields. */
+ private List<PojoField> fields = new LinkedList<>();
+
+ /** Partition key fields. */
+ private List<PojoField> partKeyFields = new LinkedList<>();
+
+ /** Cluster key fields. */
+ private List<PojoField> clusterKeyFields = new LinkedList<>();
+
+ /**
+ * Creates key persistence settings object based on it's XML configuration.
+ *
+ * @param el XML element storing key persistence settings
+ */
+ public KeyPersistenceSettings(Element el) {
+ super(el);
+
+ if (!PersistenceStrategy.POJO.equals(getStrategy()))
+ return;
+
+ NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT);
+
+ Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null;
+
+ Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ?
+ (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null;
+
+ if (partKeysNode == null && clusterKeysNode != null) {
+ throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " +
+ "doesn't specify partition key mappings");
+ }
+
+ partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors());
+
+ if (partKeyFields == null || partKeyFields.isEmpty()) {
+ throw new IllegalStateException("Failed to initialize partition key fields for class '" +
+ getJavaClass().getName() + "'");
+ }
+
+ clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields));
+
+ fields = new LinkedList<>();
+ fields.addAll(partKeyFields);
+ fields.addAll(clusterKeyFields);
+
+ checkDuplicates(fields);
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<PojoField> getFields() {
+ return fields;
+ }
+
+ /**
+ * Returns Cassandra DDL for primary key.
+ *
+ * @return DDL statement.
+ */
+ public String getPrimaryKeyDDL() {
+ StringBuilder partKey = new StringBuilder();
+
+ List<String> cols = getPartitionKeyColumns();
+ for (String column : cols) {
+ if (partKey.length() != 0)
+ partKey.append(", ");
+
+ partKey.append(column);
+ }
+
+ StringBuilder clusterKey = new StringBuilder();
+
+ cols = getClusterKeyColumns();
+ if (cols != null) {
+ for (String column : cols) {
+ if (clusterKey.length() != 0)
+ clusterKey.append(", ");
+
+ clusterKey.append(column);
+ }
+ }
+
+ return clusterKey.length() == 0 ?
+ " primary key ((" + partKey.toString() + "))" :
+ " primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")";
+ }
+
+ /**
+ * Returns Cassandra DDL for cluster key.
+ *
+ * @return Cluster key DDL.
+ */
+ public String getClusteringDDL() {
+ StringBuilder builder = new StringBuilder();
+
+ for (PojoField field : clusterKeyFields) {
+ PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder();
+
+ if (sortOrder == null)
+ continue;
+
+ if (builder.length() != 0)
+ builder.append(", ");
+
+ boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder);
+
+ builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc");
+ }
+
+ return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")";
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String defaultColumnName() {
+ return "key";
+ }
+
+ /**
+ * Returns partition key columns of Cassandra table.
+ *
+ * @return List of column names.
+ */
+ private List<String> getPartitionKeyColumns() {
+ List<String> cols = new LinkedList<>();
+
+ if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) {
+ cols.add(getColumn());
+ return cols;
+ }
+
+ if (partKeyFields != null) {
+ for (PojoField field : partKeyFields)
+ cols.add(field.getColumn());
+ }
+
+ return cols;
+ }
+
+ /**
+ * Returns cluster key columns of Cassandra table.
+ *
+ * @return List of column names.
+ */
+ private List<String> getClusterKeyColumns() {
+ List<String> cols = new LinkedList<>();
+
+ if (clusterKeyFields != null) {
+ for (PojoField field : clusterKeyFields)
+ cols.add(field.getColumn());
+ }
+
+ return cols;
+ }
+
+ /**
+ * Extracts POJO fields specified in XML element.
+ *
+ * @param el XML element describing fields.
+ * @param descriptors POJO fields descriptors.
+ * @return List of {@code This} fields.
+ */
+ private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) {
+ List<PojoField> list = new LinkedList<>();
+
+ if (el == null && (descriptors == null || descriptors.isEmpty()))
+ return list;
+
+ if (el == null) {
+ for (PropertyDescriptor descriptor : descriptors)
+ list.add(new PojoKeyField(descriptor));
+
+ return list;
+ }
+
+ NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+ int cnt = nodes == null ? 0 : nodes.getLength();
+
+ if (cnt == 0) {
+ throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " +
+ "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" +
+ CLUSTER_KEY_ELEMENT + "' element");
+ }
+
+ for (int i = 0; i < cnt; i++) {
+ PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass());
+
+ PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName());
+
+ if (desc == null) {
+ throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+ "' doesn't exist in '" + getJavaClass().getName() + "' class");
+ }
+
+ list.add(field);
+ }
+
+ return list;
+ }
+
+ /**
+ * @return POJO field descriptors for partition key.
+ */
+ private List<PropertyDescriptor> getPartitionKeyDescriptors() {
+ List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(),
+ AffinityKeyMapped.class, true);
+
+ return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ?
+ primitivePropDescriptors :
+ PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+ }
+
+ /**
+ * @return POJO field descriptors for cluster key.
+ */
+ private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) {
+ List<PropertyDescriptor> primitivePropDescriptors =
+ PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+
+ if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() ||
+ partKeyFields.size() == primitivePropDescriptors.size())
+ return null;
+
+ for (PojoField field : partKeyFields) {
+ for (int i = 0; i < primitivePropDescriptors.size(); i++) {
+ if (primitivePropDescriptors.get(i).getName().equals(field.getName())) {
+ primitivePropDescriptors.remove(i);
+ break;
+ }
+ }
+ }
+
+ return primitivePropDescriptors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
new file mode 100644
index 0000000..2c43ed4
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
@@ -0,0 +1,478 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.InputSource;
+
+/**
+ * Stores persistence settings for Ignite cache key and value
+ */
+public class KeyValuePersistenceSettings implements Serializable {
+ /**
+ * Default Cassandra keyspace options which should be used to create new keyspace.
+ * <ul>
+ * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/>
+ * If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>.
+ * </li>
+ * <li> Three replicas will be created for each data block. </li>
+ * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li>
+ * </ul>
+ */
+ private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " +
+ "'replication_factor' : 3} and durable_writes = true";
+
+ /** Xml attribute specifying Cassandra keyspace to use. */
+ private static final String KEYSPACE_ATTR = "keyspace";
+
+ /** Xml attribute specifying Cassandra table to use. */
+ private static final String TABLE_ATTR = "table";
+
+ /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */
+ private static final String TTL_ATTR = "ttl";
+
+ /** Root xml element containing persistence settings specification. */
+ private static final String PERSISTENCE_NODE = "persistence";
+
+ /** Xml element specifying Cassandra keyspace options. */
+ private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions";
+
+ /** Xml element specifying Cassandra table options. */
+ private static final String TABLE_OPTIONS_NODE = "tableOptions";
+
+ /** Xml element specifying Ignite cache key persistence settings. */
+ private static final String KEY_PERSISTENCE_NODE = "keyPersistence";
+
+ /** Xml element specifying Ignite cache value persistence settings. */
+ private static final String VALUE_PERSISTENCE_NODE = "valuePersistence";
+
+ /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */
+ private Integer ttl;
+
+ /** Cassandra keyspace (analog of tablespace in relational databases). */
+ private String keyspace;
+
+ /** Cassandra table. */
+ private String tbl;
+
+ /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */
+ private String tblOptions;
+
+ /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */
+ private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS;
+
+ /** Persistence settings for Ignite cache keys. */
+ private KeyPersistenceSettings keyPersistenceSettings;
+
+ /** Persistence settings for Ignite cache values. */
+ private ValuePersistenceSettings valPersistenceSettings;
+
+ /**
+ * Constructs Ignite cache key/value persistence settings.
+ *
+ * @param settings string containing xml with persistence settings for Ignite cache key/value
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public KeyValuePersistenceSettings(String settings) {
+ init(settings);
+ }
+
+ /**
+ * Constructs Ignite cache key/value persistence settings.
+ *
+ * @param settingsFile xml file with persistence settings for Ignite cache key/value
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public KeyValuePersistenceSettings(File settingsFile) {
+ InputStream in;
+
+ try {
+ in = new FileInputStream(settingsFile);
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " +
+ settingsFile.getAbsolutePath(), e);
+ }
+
+ init(loadSettings(in));
+ }
+
+ /**
+ * Constructs Ignite cache key/value persistence settings.
+ *
+ * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value
+ */
+ public KeyValuePersistenceSettings(Resource settingsRsrc) {
+ InputStream in;
+
+ try {
+ in = settingsRsrc.getInputStream();
+ }
+ catch (IOException e) {
+ throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e);
+ }
+
+ init(loadSettings(in));
+ }
+
+ /**
+ * Returns ttl to use for while inserting new rows into Cassandra table.
+ *
+ * @return ttl
+ */
+ public Integer getTTL() {
+ return ttl;
+ }
+
+ /**
+ * Returns Cassandra keyspace to use.
+ *
+ * @return keyspace.
+ */
+ public String getKeyspace() {
+ return keyspace;
+ }
+
+ /**
+ * Returns Cassandra table to use.
+ *
+ * @return table.
+ */
+ public String getTable() {
+ return tbl;
+ }
+
+ /**
+ * Returns full name of Cassandra table to use (including keyspace).
+ *
+ * @return full table name in format "keyspace.table".
+ */
+ public String getTableFullName()
+ {
+ return keyspace + "." + tbl;
+ }
+
+ /**
+ * Returns persistence settings for Ignite cache keys.
+ *
+ * @return keys persistence settings.
+ */
+ public KeyPersistenceSettings getKeyPersistenceSettings() {
+ return keyPersistenceSettings;
+ }
+
+ /**
+ * Returns persistence settings for Ignite cache values.
+ *
+ * @return values persistence settings.
+ */
+ public ValuePersistenceSettings getValuePersistenceSettings() {
+ return valPersistenceSettings;
+ }
+
+ /**
+ * Returns list of POJO fields to be mapped to Cassandra table columns.
+ *
+ * @return POJO fields list.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public List<PojoField> getFields() {
+ List<PojoField> fields = new LinkedList<>();
+
+ for (PojoField field : keyPersistenceSettings.getFields())
+ fields.add(field);
+
+ for (PojoField field : valPersistenceSettings.getFields())
+ fields.add(field);
+
+ return fields;
+ }
+
+ /**
+ * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns.
+ *
+ * @return POJO fields list.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public List<PojoField> getKeyFields() {
+ return keyPersistenceSettings.getFields();
+ }
+
+ /**
+ * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns.
+ *
+ * @return POJO fields list.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public List<PojoField> getValueFields() {
+ return valPersistenceSettings.getFields();
+ }
+
+ /**
+ * Returns DDL statement to create Cassandra keyspace.
+ *
+ * @return Keyspace DDL statement.
+ */
+ public String getKeyspaceDDLStatement() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("create keyspace if not exists ").append(keyspace);
+
+ if (keyspaceOptions != null) {
+ if (!keyspaceOptions.trim().toLowerCase().startsWith("with"))
+ builder.append("\nwith");
+
+ builder.append(" ").append(keyspaceOptions);
+ }
+
+ String statement = builder.toString().trim().replaceAll(" +", " ");
+
+ return statement.endsWith(";") ? statement : statement + ";";
+ }
+
+ /**
+ * Returns DDL statement to create Cassandra table.
+ *
+ * @return Table DDL statement.
+ */
+ public String getTableDDLStatement() {
+ String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL();
+
+ String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL();
+
+ String clusteringDDL = keyPersistenceSettings.getClusteringDDL();
+
+ String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : "";
+
+ if (clusteringDDL != null && !clusteringDDL.isEmpty())
+ optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL;
+
+ if (!optionsDDL.trim().isEmpty())
+ optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim();
+
+ StringBuilder builder = new StringBuilder();
+
+ builder.append("create table if not exists ").append(keyspace).append(".").append(tbl);
+ builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)");
+
+ if (!optionsDDL.isEmpty())
+ builder.append(" \n").append(optionsDDL);
+
+ String tblDDL = builder.toString().trim().replaceAll(" +", " ");
+
+ return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";";
+ }
+
+ /**
+ * Returns DDL statements to create Cassandra table secondary indexes.
+ *
+ * @return DDL statements to create secondary indexes.
+ */
+ public List<String> getIndexDDLStatements() {
+ List<String> idxDDLs = new LinkedList<>();
+
+ List<PojoField> fields = valPersistenceSettings.getFields();
+
+ for (PojoField field : fields) {
+ if (((PojoValueField)field).isIndexed())
+ idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl));
+ }
+
+ return idxDDLs;
+ }
+
+ /**
+ * Loads Ignite cache persistence settings from resource.
+ *
+ * @param in Input stream.
+ * @return String containing xml with Ignite cache persistence settings.
+ */
+ private String loadSettings(InputStream in) {
+ StringBuilder settings = new StringBuilder();
+ BufferedReader reader = null;
+
+ try {
+ reader = new BufferedReader(new InputStreamReader(in));
+
+ String line = reader.readLine();
+
+ while (line != null) {
+ if (settings.length() != 0)
+ settings.append(SystemHelper.LINE_SEPARATOR);
+
+ settings.append(line);
+
+ line = reader.readLine();
+ }
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e);
+ }
+ finally {
+ U.closeQuiet(reader);
+ U.closeQuiet(in);
+ }
+
+ return settings.toString();
+ }
+
+ /**
+ * @param elem Element with data.
+ * @param attr Attribute name.
+ * @return Numeric value for specified attribute.
+ */
+ private int extractIntAttribute(Element elem, String attr) {
+ String val = elem.getAttribute(attr).trim();
+
+ try {
+ return Integer.parseInt(val);
+ }
+ catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute");
+ }
+ }
+
+ /**
+ * Initializes persistence settings from XML string.
+ *
+ * @param settings XML string containing Ignite cache persistence settings configuration.
+ */
+ @SuppressWarnings("IfCanBeSwitch")
+ private void init(String settings) {
+ Document doc;
+
+ try {
+ DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
+ DocumentBuilder builder = factory.newDocumentBuilder();
+ doc = builder.parse(new InputSource(new StringReader(settings)));
+ }
+ catch (Throwable e) {
+ throw new IllegalArgumentException("Failed to parse persistence settings:" +
+ SystemHelper.LINE_SEPARATOR + settings, e);
+ }
+
+ Element root = doc.getDocumentElement();
+
+ if (!PERSISTENCE_NODE.equals(root.getNodeName())) {
+ throw new IllegalArgumentException("Incorrect persistence settings specified. " +
+ "Root XML element should be 'persistence'");
+ }
+
+ if (!root.hasAttribute(KEYSPACE_ATTR)) {
+ throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR +
+ "' attribute should be specified");
+ }
+
+ if (!root.hasAttribute(TABLE_ATTR)) {
+ throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR +
+ "' attribute should be specified");
+ }
+
+ keyspace = root.getAttribute(KEYSPACE_ATTR).trim();
+ tbl = root.getAttribute(TABLE_ATTR).trim();
+
+ if (root.hasAttribute(TTL_ATTR))
+ ttl = extractIntAttribute(root, TTL_ATTR);
+
+ if (!root.hasChildNodes()) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "there are no key and value persistence settings specified");
+ }
+
+ NodeList children = root.getChildNodes();
+ int cnt = children.getLength();
+
+ for (int i = 0; i < cnt; i++) {
+ Node node = children.item(i);
+
+ if (node.getNodeType() != Node.ELEMENT_NODE)
+ continue;
+
+ Element el = (Element)node;
+ String nodeName = el.getNodeName();
+
+ if (nodeName.equals(TABLE_OPTIONS_NODE)) {
+ tblOptions = el.getTextContent();
+ tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+ }
+ else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) {
+ keyspaceOptions = el.getTextContent();
+ keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
+ }
+ else if (nodeName.equals(KEY_PERSISTENCE_NODE))
+ keyPersistenceSettings = new KeyPersistenceSettings(el);
+ else if (nodeName.equals(VALUE_PERSISTENCE_NODE))
+ valPersistenceSettings = new ValuePersistenceSettings(el);
+ }
+
+ if (keyPersistenceSettings == null) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "there are no key persistence settings specified");
+ }
+
+ if (valPersistenceSettings == null) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "there are no value persistence settings specified");
+ }
+
+ List<PojoField> keyFields = keyPersistenceSettings.getFields();
+ List<PojoField> valFields = valPersistenceSettings.getFields();
+
+ if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) &&
+ (keyFields == null || keyFields.isEmpty())) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "there are no key fields found");
+ }
+
+ if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) &&
+ (valFields == null || valFields.isEmpty())) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "there are no value fields found");
+ }
+
+ if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty())
+ return;
+
+ for (PojoField keyField : keyFields) {
+ for (PojoField valField : valFields) {
+ if (keyField.getColumn().equals(valField.getColumn())) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
+ "key column '" + keyField.getColumn() + "' also specified as a value column");
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
new file mode 100644
index 0000000..e734ca3
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceController.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Row;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+
+/**
+ * Intermediate layer between persistent store (Cassandra) and Ignite cache key/value classes.
+ * Handles all the mappings to/from Java classes into Cassandra and responsible for all the details
+ * of how Java objects should be written/loaded to/from Cassandra.
+ */
+public class PersistenceController {
+ /** Ignite cache key/value persistence settings. */
+ private KeyValuePersistenceSettings persistenceSettings;
+
+ /** CQL statement to insert row into Cassandra table. */
+ private String writeStatement;
+
+ /** CQL statement to delete row from Cassandra table. */
+ private String delStatement;
+
+ /** CQL statement to select value fields from Cassandra table. */
+ private String loadStatement;
+
+ /** CQL statement to select key/value fields from Cassandra table. */
+ private String loadStatementWithKeyFields;
+
+ /**
+ * Constructs persistence controller from Ignite cache persistence settings.
+ *
+ * @param settings persistence settings.
+ */
+ public PersistenceController(KeyValuePersistenceSettings settings) {
+ if (settings == null)
+ throw new IllegalArgumentException("Persistent settings can't be null");
+
+ this.persistenceSettings = settings;
+ }
+
+ /**
+ * Returns Ignite cache persistence settings.
+ *
+ * @return persistence settings.
+ */
+ public KeyValuePersistenceSettings getPersistenceSettings() {
+ return persistenceSettings;
+ }
+
+ /**
+ * Returns Cassandra keyspace to use.
+ *
+ * @return keyspace.
+ */
+ public String getKeyspace() {
+ return persistenceSettings.getKeyspace();
+ }
+
+ /**
+ * Returns Cassandra table to use.
+ *
+ * @return table.
+ */
+ public String getTable() {
+ return persistenceSettings.getTable();
+ }
+
+ /**
+ * Returns CQL statement to insert row into Cassandra table.
+ *
+ * @return CQL statement.
+ */
+ public String getWriteStatement() {
+ if (writeStatement != null)
+ return writeStatement;
+
+ List<String> cols = getKeyValueColumns();
+
+ StringBuilder colsList = new StringBuilder();
+ StringBuilder questionsList = new StringBuilder();
+
+ for (String column : cols) {
+ if (colsList.length() != 0) {
+ colsList.append(", ");
+ questionsList.append(",");
+ }
+
+ colsList.append(column);
+ questionsList.append("?");
+ }
+
+ writeStatement = "insert into " + persistenceSettings.getKeyspace() + "." + persistenceSettings.getTable() + " (" +
+ colsList.toString() + ") values (" + questionsList.toString() + ")";
+
+ if (persistenceSettings.getTTL() != null)
+ writeStatement += " using ttl " + persistenceSettings.getTTL();
+
+ writeStatement += ";";
+
+ return writeStatement;
+ }
+
+ /**
+ * Returns CQL statement to delete row from Cassandra table.
+ *
+ * @return CQL statement.
+ */
+ public String getDeleteStatement() {
+ if (delStatement != null)
+ return delStatement;
+
+ List<String> cols = getKeyColumns();
+
+ StringBuilder statement = new StringBuilder();
+
+ for (String column : cols) {
+ if (statement.length() != 0)
+ statement.append(" and ");
+
+ statement.append(column).append("=?");
+ }
+
+ statement.append(";");
+
+ delStatement = "delete from " +
+ persistenceSettings.getKeyspace() + "." +
+ persistenceSettings.getTable() + " where " +
+ statement.toString();
+
+ return delStatement;
+ }
+
+ /**
+ * Returns CQL statement to select key/value fields from Cassandra table.
+ *
+ * @param includeKeyFields whether to include/exclude key fields from the returned row.
+ *
+ * @return CQL statement.
+ */
+ public String getLoadStatement(boolean includeKeyFields) {
+ if (loadStatement != null && loadStatementWithKeyFields != null)
+ return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+
+ List<String> valCols = getValueColumns();
+
+ List<String> keyCols = getKeyColumns();
+
+ StringBuilder hdrWithKeyFields = new StringBuilder("select ");
+
+ for (int i = 0; i < keyCols.size(); i++) {
+ if (i > 0)
+ hdrWithKeyFields.append(", ");
+
+ hdrWithKeyFields.append(keyCols.get(i));
+ }
+
+ StringBuilder hdr = new StringBuilder("select ");
+
+ for (int i = 0; i < valCols.size(); i++) {
+ if (i > 0)
+ hdr.append(", ");
+
+ hdrWithKeyFields.append(",");
+
+ hdr.append(valCols.get(i));
+ hdrWithKeyFields.append(valCols.get(i));
+ }
+
+ StringBuilder statement = new StringBuilder();
+
+ statement.append(" from ");
+ statement.append(persistenceSettings.getKeyspace());
+ statement.append(".").append(persistenceSettings.getTable());
+ statement.append(" where ");
+
+ for (int i = 0; i < keyCols.size(); i++) {
+ if (i > 0)
+ statement.append(" and ");
+
+ statement.append(keyCols.get(i)).append("=?");
+ }
+
+ statement.append(";");
+
+ loadStatement = hdr.toString() + statement.toString();
+ loadStatementWithKeyFields = hdrWithKeyFields.toString() + statement.toString();
+
+ return includeKeyFields ? loadStatementWithKeyFields : loadStatement;
+ }
+
+ /**
+ * Binds Ignite cache key object to {@link com.datastax.driver.core.PreparedStatement}.
+ *
+ * @param statement statement to which key object should be bind.
+ * @param key key object.
+ *
+ * @return statement with bounded key.
+ */
+ public BoundStatement bindKey(PreparedStatement statement, Object key) {
+ KeyPersistenceSettings settings = persistenceSettings.getKeyPersistenceSettings();
+
+ Object[] values = getBindingValues(settings.getStrategy(),
+ settings.getSerializer(), settings.getFields(), key);
+
+ return statement.bind(values);
+ }
+
+ /**
+ * Binds Ignite cache key and value object to {@link com.datastax.driver.core.PreparedStatement}.
+ *
+ * @param statement statement to which key and value object should be bind.
+ * @param key key object.
+ * @param val value object.
+ *
+ * @return statement with bounded key and value.
+ */
+ public BoundStatement bindKeyValue(PreparedStatement statement, Object key, Object val) {
+ KeyPersistenceSettings keySettings = persistenceSettings.getKeyPersistenceSettings();
+ Object[] keyValues = getBindingValues(keySettings.getStrategy(),
+ keySettings.getSerializer(), keySettings.getFields(), key);
+
+ ValuePersistenceSettings valSettings = persistenceSettings.getValuePersistenceSettings();
+ Object[] valValues = getBindingValues(valSettings.getStrategy(),
+ valSettings.getSerializer(), valSettings.getFields(), val);
+
+ Object[] values = new Object[keyValues.length + valValues.length];
+
+ int i = 0;
+
+ for (Object keyVal : keyValues) {
+ values[i] = keyVal;
+ i++;
+ }
+
+ for (Object valVal : valValues) {
+ values[i] = valVal;
+ i++;
+ }
+
+ return statement.bind(values);
+ }
+
+ /**
+ * Builds Ignite cache key object from returned Cassandra table row.
+ *
+ * @param row Cassandra table row.
+ *
+ * @return key object.
+ */
+ @SuppressWarnings("UnusedDeclaration")
+ public Object buildKeyObject(Row row) {
+ return buildObject(row, persistenceSettings.getKeyPersistenceSettings());
+ }
+
+ /**
+ * Builds Ignite cache value object from Cassandra table row .
+ *
+ * @param row Cassandra table row.
+ *
+ * @return value object.
+ */
+ public Object buildValueObject(Row row) {
+ return buildObject(row, persistenceSettings.getValuePersistenceSettings());
+ }
+
+ /**
+ * Builds object from Cassandra table row.
+ *
+ * @param row Cassandra table row.
+ * @param settings persistence settings to use.
+ *
+ * @return object.
+ */
+ private Object buildObject(Row row, PersistenceSettings settings) {
+ if (row == null)
+ return null;
+
+ PersistenceStrategy stgy = settings.getStrategy();
+
+ Class clazz = settings.getJavaClass();
+
+ String col = settings.getColumn();
+
+ List<PojoField> fields = settings.getFields();
+
+ if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+ return PropertyMappingHelper.getCassandraColumnValue(row, col, clazz, null);
+
+ if (PersistenceStrategy.BLOB.equals(stgy))
+ return settings.getSerializer().deserialize(row.getBytes(col));
+
+ Object obj;
+
+ try {
+ obj = clazz.newInstance();
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to instantiate object of type '" + clazz.getName() + "' using reflection", e);
+ }
+
+ for (PojoField field : fields)
+ field.setValueFromRow(row, obj, settings.getSerializer());
+
+ return obj;
+ }
+
+ /**
+ * Extracts field values from POJO object and converts them into Java types
+ * which could be mapped to Cassandra types.
+ *
+ * @param stgy persistence strategy to use.
+ * @param serializer serializer to use for BLOBs.
+ * @param fields fields who's values should be extracted.
+ * @param obj object instance who's field values should be extracted.
+ *
+ * @return array of object field values converted into Java object instances having Cassandra compatible types
+ */
+ private Object[] getBindingValues(PersistenceStrategy stgy, Serializer serializer, List<PojoField> fields, Object obj) {
+ if (PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+ if (PropertyMappingHelper.getCassandraType(obj.getClass()) == null ||
+ obj.getClass().equals(ByteBuffer.class) || obj instanceof byte[]) {
+ throw new IllegalArgumentException("Couldn't deserialize instance of class '" +
+ obj.getClass().getName() + "' using PRIMITIVE strategy. Please use BLOB strategy for this case.");
+ }
+
+ return new Object[] {obj};
+ }
+
+ if (PersistenceStrategy.BLOB.equals(stgy))
+ return new Object[] {serializer.serialize(obj)};
+
+ Object[] values = new Object[fields.size()];
+
+ int i = 0;
+
+ for (PojoField field : fields) {
+ Object val = field.getValueFromObject(obj, serializer);
+
+ if (val instanceof byte[])
+ val = ByteBuffer.wrap((byte[]) val);
+
+ values[i] = val;
+
+ i++;
+ }
+
+ return values;
+ }
+
+ /**
+ * Returns list of Cassandra table columns mapped to Ignite cache key and value fields
+ *
+ * @return list of column names
+ */
+ private List<String> getKeyValueColumns() {
+ List<String> cols = getKeyColumns();
+
+ cols.addAll(getValueColumns());
+
+ return cols;
+ }
+
+ /**
+ * Returns list of Cassandra table columns mapped to Ignite cache key fields
+ *
+ * @return list of column names
+ */
+ private List<String> getKeyColumns() {
+ return getColumns(persistenceSettings.getKeyPersistenceSettings());
+ }
+
+ /**
+ * Returns list of Cassandra table columns mapped to Ignite cache value fields
+ *
+ * @return list of column names
+ */
+ private List<String> getValueColumns() {
+ return getColumns(persistenceSettings.getValuePersistenceSettings());
+ }
+
+ /**
+ * Returns list of Cassandra table columns based on persistence strategy to use
+ *
+ * @return list of column names
+ */
+ private List<String> getColumns(PersistenceSettings settings) {
+ List<String> cols = new LinkedList<>();
+
+ if (!PersistenceStrategy.POJO.equals(settings.getStrategy())) {
+ cols.add(settings.getColumn());
+ return cols;
+ }
+
+ for (PojoField field : settings.getFields())
+ cols.add(field.getColumn());
+
+ return cols;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
new file mode 100644
index 0000000..20d790a
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceSettings.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Stores persistence settings, which describes how particular key/value
+ * from Ignite cache should be stored in Cassandra.
+ */
+public abstract class PersistenceSettings implements Serializable {
+ /** Xml attribute specifying persistence strategy. */
+ private static final String STRATEGY_ATTR = "strategy";
+
+ /** Xml attribute specifying Cassandra column name. */
+ private static final String COLUMN_ATTR = "column";
+
+ /** Xml attribute specifying BLOB serializer to use. */
+ private static final String SERIALIZER_ATTR = "serializer";
+
+ /** Xml attribute specifying java class of the object to be persisted. */
+ private static final String CLASS_ATTR = "class";
+
+ /** Persistence strategy to use. */
+ private PersistenceStrategy stgy;
+
+ /** Java class of the object to be persisted. */
+ private Class javaCls;
+
+ /** Cassandra table column name where object should be persisted in
+ * case of using BLOB or PRIMITIVE persistence strategy. */
+ private String col;
+
+ /** Serializer for BLOBs. */
+ private Serializer serializer = new JavaSerializer();
+
+ /**
+ * Extracts property descriptor from the descriptors list by its name.
+ *
+ * @param descriptors descriptors list.
+ * @param propName property name.
+ *
+ * @return property descriptor.
+ */
+ public static PropertyDescriptor findPropertyDescriptor(List<PropertyDescriptor> descriptors, String propName) {
+ if (descriptors == null || descriptors.isEmpty() || propName == null || propName.trim().isEmpty())
+ return null;
+
+ for (PropertyDescriptor descriptor : descriptors) {
+ if (descriptor.getName().equals(propName))
+ return descriptor;
+ }
+
+ return null;
+ }
+
+ /**
+ * Constructs persistence settings from corresponding XML element.
+ *
+ * @param el xml element containing persistence settings configuration.
+ */
+ @SuppressWarnings("unchecked")
+ public PersistenceSettings(Element el) {
+ if (el == null)
+ throw new IllegalArgumentException("DOM element representing key/value persistence object can't be null");
+
+ if (!el.hasAttribute(STRATEGY_ATTR)) {
+ throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+ STRATEGY_ATTR + "' attribute");
+ }
+
+ try {
+ stgy = PersistenceStrategy.valueOf(el.getAttribute(STRATEGY_ATTR).trim().toUpperCase());
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Incorrect persistence strategy specified: " + el.getAttribute(STRATEGY_ATTR));
+ }
+
+ if (!el.hasAttribute(CLASS_ATTR) && !PersistenceStrategy.BLOB.equals(stgy)) {
+ throw new IllegalArgumentException("DOM element representing key/value persistence object should have '" +
+ CLASS_ATTR + "' attribute or have BLOB persistence strategy");
+ }
+
+ try {
+ javaCls = el.hasAttribute(CLASS_ATTR) ? getClassInstance(el.getAttribute(CLASS_ATTR).trim()) : null;
+ }
+ catch (Throwable e) {
+ throw new IllegalArgumentException("Incorrect java class specified '" + el.getAttribute(CLASS_ATTR) + "' " +
+ "for Cassandra persistence", e);
+ }
+
+ if (!PersistenceStrategy.BLOB.equals(stgy) &&
+ (ByteBuffer.class.equals(javaCls) || byte[].class.equals(javaCls))) {
+ throw new IllegalArgumentException("Java class '" + el.getAttribute(CLASS_ATTR) + "' " +
+ "specified could only be persisted using BLOB persistence strategy");
+ }
+
+ if (PersistenceStrategy.PRIMITIVE.equals(stgy) &&
+ PropertyMappingHelper.getCassandraType(javaCls) == null) {
+ throw new IllegalArgumentException("Current implementation doesn't support persisting '" +
+ javaCls.getName() + "' object using PRIMITIVE strategy");
+ }
+
+ if (PersistenceStrategy.POJO.equals(stgy)) {
+ if (javaCls == null)
+ throw new IllegalStateException("Object java class should be specified for POJO persistence strategy");
+
+ try {
+ javaCls.getConstructor();
+ }
+ catch (Throwable e) {
+ throw new IllegalArgumentException("Java class '" + javaCls.getName() + "' couldn't be used as POJO " +
+ "cause it doesn't have no arguments constructor", e);
+ }
+ }
+
+ if (el.hasAttribute(COLUMN_ATTR)) {
+ if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.PRIMITIVE.equals(stgy)) {
+ throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+ "'" + COLUMN_ATTR + "' attribute is only applicable for PRIMITIVE or BLOB strategy");
+ }
+
+ col = el.getAttribute(COLUMN_ATTR).trim();
+ }
+
+ if (el.hasAttribute(SERIALIZER_ATTR)) {
+ if (!PersistenceStrategy.BLOB.equals(stgy) && !PersistenceStrategy.POJO.equals(stgy)) {
+ throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+ "'" + SERIALIZER_ATTR + "' attribute is only applicable for BLOB and POJO strategies");
+ }
+
+ Object obj = newObjectInstance(el.getAttribute(SERIALIZER_ATTR).trim());
+
+ if (!(obj instanceof Serializer)) {
+ throw new IllegalArgumentException("Incorrect configuration of Cassandra key/value persistence settings, " +
+ "serializer class '" + el.getAttribute(SERIALIZER_ATTR) + "' doesn't implement '" +
+ Serializer.class.getName() + "' interface");
+ }
+
+ serializer = (Serializer)obj;
+ }
+
+ if ((PersistenceStrategy.BLOB.equals(stgy) || PersistenceStrategy.PRIMITIVE.equals(stgy)) && col == null)
+ col = defaultColumnName();
+ }
+
+ /**
+ * Returns java class of the object to be persisted.
+ *
+ * @return java class.
+ */
+ public Class getJavaClass() {
+ return javaCls;
+ }
+
+ /**
+ * Returns persistence strategy to use.
+ *
+ * @return persistence strategy.
+ */
+ public PersistenceStrategy getStrategy() {
+ return stgy;
+ }
+
+ /**
+ * Returns Cassandra table column name where object should be persisted in
+ * case of using BLOB or PRIMITIVE persistence strategy.
+ *
+ * @return column name.
+ */
+ public String getColumn() {
+ return col;
+ }
+
+ /**
+ * Returns serializer to be used for BLOBs.
+ *
+ * @return serializer.
+ */
+ public Serializer getSerializer() {
+ return serializer;
+ }
+
+ /**
+ * Returns list of POJO fields to be persisted.
+ *
+ * @return list of fields.
+ */
+ public abstract List<PojoField> getFields();
+
+ /**
+ * Returns Cassandra table columns DDL, corresponding to POJO fields which should be persisted.
+ *
+ * @return DDL statement for Cassandra table fields
+ */
+ public String getTableColumnsDDL() {
+ if (PersistenceStrategy.BLOB.equals(stgy))
+ return " " + col + " " + DataType.Name.BLOB.toString();
+
+ if (PersistenceStrategy.PRIMITIVE.equals(stgy))
+ return " " + col + " " + PropertyMappingHelper.getCassandraType(javaCls);
+
+ StringBuilder builder = new StringBuilder();
+
+ for (PojoField field : getFields()) {
+ if (builder.length() > 0)
+ builder.append(",\n");
+
+ builder.append(" ").append(field.getColumnDDL());
+ }
+
+ if (builder.length() == 0) {
+ throw new IllegalStateException("There are no POJO fields found for '" + javaCls.toString()
+ + "' class to be presented as a Cassandra primary key");
+ }
+
+ return builder.toString();
+ }
+
+ /**
+ * Returns default name for Cassandra column (if it's not specified explicitly).
+ *
+ * @return column name
+ */
+ protected abstract String defaultColumnName();
+
+ /**
+ * Checks if there are POJO filed with the same name or same Cassandra column specified in persistence settings
+ *
+ * @param fields list of fields to be persisted into Cassandra
+ */
+ protected void checkDuplicates(List<PojoField> fields) {
+ if (fields == null || fields.isEmpty())
+ return;
+
+ for (PojoField field1 : fields) {
+ boolean sameNames = false;
+ boolean sameCols = false;
+
+ for (PojoField field2 : fields) {
+ if (field1.getName().equals(field2.getName())) {
+ if (sameNames) {
+ throw new IllegalArgumentException("Incorrect Cassandra key persistence settings, " +
+ "two POJO fields with the same name '" + field1.getName() + "' specified");
+ }
+
+ sameNames = true;
+ }
+
+ if (field1.getColumn().equals(field2.getColumn())) {
+ if (sameCols) {
+ throw new IllegalArgumentException("Incorrect Cassandra persistence settings, " +
+ "two POJO fields with the same column '" + field1.getColumn() + "' specified");
+ }
+
+ sameCols = true;
+ }
+ }
+ }
+ }
+
+ /**
+ * Instantiates Class object for particular class
+ *
+ * @param clazz class name
+ * @return Class object
+ */
+ private Class getClassInstance(String clazz) {
+ try {
+ return Class.forName(clazz);
+ }
+ catch (ClassNotFoundException ignored) {
+ }
+
+ try {
+ return Class.forName(clazz, true, Thread.currentThread().getContextClassLoader());
+ }
+ catch (ClassNotFoundException ignored) {
+ }
+
+ try {
+ return Class.forName(clazz, true, PersistenceSettings.class.getClassLoader());
+ }
+ catch (ClassNotFoundException ignored) {
+ }
+
+ try {
+ return Class.forName(clazz, true, ClassLoader.getSystemClassLoader());
+ }
+ catch (ClassNotFoundException ignored) {
+ }
+
+ throw new IgniteException("Failed to load class '" + clazz + "' using reflection");
+ }
+
+ /**
+ * Creates new object instance of particular class
+ *
+ * @param clazz class name
+ * @return object
+ */
+ private Object newObjectInstance(String clazz) {
+ try {
+ return getClassInstance(clazz).newInstance();
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to instantiate class '" + clazz + "' using default constructor", e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
new file mode 100644
index 0000000..4b1e2d8
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PersistenceStrategy.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+/**
+ * Describes persistence strategy to be used to persist object data into Cassandra.
+ */
+public enum PersistenceStrategy {
+ /**
+ * Stores object value as is, by mapping its value to Cassandra table column with corresponding type.
+ * <p>
+ * Could be used for primitive java type (like Integer, String, Long and etc) which could be directly mapped
+ * to appropriate Cassandra types.
+ */
+ PRIMITIVE,
+
+ /**
+ * Stores object value as BLOB, by mapping its value to Cassandra table column with blob type.
+ * Could be used for any java type. Conversion of java object to BLOB is handled by specified serializer.
+ * <p>
+ * Available serializer implementations:
+ * <ul>
+ * <li>
+ * org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer - uses standard Java
+ * serialization framework.
+ * </li>
+ * <li>
+ * org.apache.ignite.cache.store.cassandra.serializer.KryoSerializer - uses Kryo serialization
+ * framework.
+ * </li>
+ * </ul>
+ */
+ BLOB,
+
+ /**
+ * Stores each field of an object as a column having corresponding type in Cassandra table.
+ * Provides ability to utilize Cassandra secondary indexes for object fields.
+ * <p>
+ * Could be used for objects which follow JavaBeans convention and having empty public constructor.
+ * Object fields should be:
+ * <ul>
+ * <li>Primitive java types like int, long, String and etc.</li>
+ * <li>Collections of primitive java types like List<Integer>, Map<Integer, String>, Set<Long></li>
+ * </ul>
+ */
+ POJO
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
new file mode 100644
index 0000000..af569fd
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoField.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import java.beans.PropertyDescriptor;
+import java.io.Serializable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for particular field in a POJO object, specifying how this field
+ * should be written to or loaded from Cassandra.
+ */
+public abstract class PojoField implements Serializable {
+ /** Name attribute of XML element describing Pojo field. */
+ private static final String NAME_ATTR = "name";
+
+ /** Column attribute of XML element describing Pojo field. */
+ private static final String COLUMN_ATTR = "column";
+
+ /** Field name. */
+ private String name;
+
+ /** Java class to which the field belongs. */
+ private Class javaCls;
+
+ /** Field column name in Cassandra table. */
+ private String col;
+
+ /** Field column DDL. */
+ private String colDDL;
+
+ /** Field property descriptor. */
+ private transient PropertyDescriptor desc;
+
+ /**
+ * Creates instance of {@link PojoField} based on it's description in XML element.
+ *
+ * @param el XML element describing Pojo field
+ * @param pojoCls Pojo java class.
+ */
+ public PojoField(Element el, Class<?> pojoCls) {
+ if (el == null)
+ throw new IllegalArgumentException("DOM element representing POJO field object can't be null");
+
+ if (!el.hasAttribute(NAME_ATTR)) {
+ throw new IllegalArgumentException("DOM element representing POJO field object should have '"
+ + NAME_ATTR + "' attribute");
+ }
+
+ this.name = el.getAttribute(NAME_ATTR).trim();
+ this.col = el.hasAttribute(COLUMN_ATTR) ? el.getAttribute(COLUMN_ATTR).trim() : name.toLowerCase();
+
+ init(PropertyMappingHelper.getPojoPropertyDescriptor(pojoCls, name));
+ }
+
+ /**
+ * Creates instance of {@link PojoField} from its property descriptor.
+ *
+ * @param desc Field property descriptor.
+ */
+ public PojoField(PropertyDescriptor desc) {
+ this.name = desc.getName();
+
+ QuerySqlField sqlField = desc.getReadMethod() != null ?
+ desc.getReadMethod().getAnnotation(QuerySqlField.class) :
+ desc.getWriteMethod() == null ?
+ null :
+ desc.getWriteMethod().getAnnotation(QuerySqlField.class);
+
+ this.col = sqlField != null && sqlField.name() != null ? sqlField.name() : name.toLowerCase();
+
+ init(desc);
+
+ if (sqlField != null)
+ init(sqlField);
+ }
+
+ /**
+ * @return field name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * @return Cassandra table column name.
+ */
+ public String getColumn() {
+ return col;
+ }
+
+ /**
+ * @return Cassandra table column DDL statement.
+ */
+ public String getColumnDDL() {
+ return colDDL;
+ }
+
+ /**
+ * Gets field value as an object having Cassandra compatible type.
+ * This it could be stored directly into Cassandra without any conversions.
+ *
+ * @param obj Object instance.
+ * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+ * @return Object to store in Cassandra table column.
+ */
+ public Object getValueFromObject(Object obj, Serializer serializer) {
+ try {
+ Object val = propDesc().getReadMethod().invoke(obj);
+
+ if (val == null)
+ return null;
+
+ DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(val.getClass());
+
+ if (cassandraType != null)
+ return val;
+
+ if (serializer == null) {
+ throw new IllegalStateException("Can't serialize value from object '" +
+ val.getClass().getName() + "' field '" + name + "', cause there is no BLOB serializer specified");
+ }
+
+ return serializer.serialize(val);
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to get value of the field '" + name + "' from the instance " +
+ " of '" + obj.getClass().toString() + "' class", e);
+ }
+ }
+
+ /**
+ * Sets object field value from a {@link com.datastax.driver.core.Row} returned by Cassandra CQL statement.
+ *
+ * @param row {@link com.datastax.driver.core.Row}
+ * @param obj object which field should be populated from {@link com.datastax.driver.core.Row}
+ * @param serializer {@link org.apache.ignite.cache.store.cassandra.serializer.Serializer} to use.
+ */
+ public void setValueFromRow(Row row, Object obj, Serializer serializer) {
+ Object val = PropertyMappingHelper.getCassandraColumnValue(row, col, propDesc().getPropertyType(), serializer);
+
+ try {
+ propDesc().getWriteMethod().invoke(obj, val);
+ }
+ catch (Throwable e) {
+ throw new IgniteException("Failed to set value of the field '" + name + "' of the instance " +
+ " of '" + obj.getClass().toString() + "' class", e);
+ }
+ }
+
+ /**
+ * Initializes field info from annotation.
+ *
+ * @param sqlField {@link QuerySqlField} annotation.
+ */
+ protected abstract void init(QuerySqlField sqlField);
+
+ /**
+ * Initializes field info from property descriptor.
+ *
+ * @param desc {@link PropertyDescriptor} descriptor.
+ */
+ protected void init(PropertyDescriptor desc) {
+ if (desc.getReadMethod() == null) {
+ throw new IllegalArgumentException("Field '" + desc.getName() +
+ "' of the class instance '" + desc.getPropertyType().getName() +
+ "' doesn't provide getter method");
+ }
+
+ if (desc.getWriteMethod() == null) {
+ throw new IllegalArgumentException("Field '" + desc.getName() +
+ "' of POJO object instance of the class '" + desc.getPropertyType().getName() +
+ "' doesn't provide write method");
+ }
+
+ if (!desc.getReadMethod().isAccessible())
+ desc.getReadMethod().setAccessible(true);
+
+ if (!desc.getWriteMethod().isAccessible())
+ desc.getWriteMethod().setAccessible(true);
+
+ DataType.Name cassandraType = PropertyMappingHelper.getCassandraType(desc.getPropertyType());
+ cassandraType = cassandraType == null ? DataType.Name.BLOB : cassandraType;
+
+ this.javaCls = desc.getReadMethod().getDeclaringClass();
+ this.desc = desc;
+ this.colDDL = col + " " + cassandraType.toString();
+ }
+
+ /**
+ * Returns property descriptor of the POJO field
+ *
+ * @return Property descriptor
+ */
+ private PropertyDescriptor propDesc() {
+ return desc != null ? desc : (desc = PropertyMappingHelper.getPojoPropertyDescriptor(javaCls, name));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
new file mode 100644
index 0000000..4e86d74
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoKeyField.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite key POJO class
+ */
+public class PojoKeyField extends PojoField {
+
+ /**
+ * Specifies sort order for POJO key field
+ */
+ public enum SortOrder {
+ /** Ascending sort order. */
+ ASC,
+ /** Descending sort order. */
+ DESC
+ }
+
+ /** Xml attribute specifying sort order. */
+ private static final String SORT_ATTR = "sort";
+
+ /** Sort order. */
+ private SortOrder sortOrder = null;
+
+ /**
+ * Constructs Ignite cache key POJO object descriptor.
+ *
+ * @param el xml configuration element.
+ * @param pojoCls java class of key POJO field.
+ */
+ public PojoKeyField(Element el, Class pojoCls) {
+ super(el, pojoCls);
+
+ if (el.hasAttribute(SORT_ATTR)) {
+ try {
+ sortOrder = SortOrder.valueOf(el.getAttribute(SORT_ATTR).trim().toUpperCase());
+ }
+ catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Incorrect sort order '" + el.getAttribute(SORT_ATTR) + "' specified");
+ }
+ }
+ }
+
+ /**
+ * Constructs Ignite cache key POJO object descriptor.
+ *
+ * @param desc property descriptor.
+ */
+ public PojoKeyField(PropertyDescriptor desc) {
+ super(desc);
+ }
+
+ /**
+ * Returns sort order for the field.
+ *
+ * @return sort order.
+ */
+ public SortOrder getSortOrder() {
+ return sortOrder;
+ }
+
+ /**
+ * Initializes descriptor from {@link QuerySqlField} annotation.
+ *
+ * @param sqlField {@link QuerySqlField} annotation.
+ */
+ protected void init(QuerySqlField sqlField) {
+ if (sqlField.descending())
+ sortOrder = SortOrder.DESC;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
new file mode 100644
index 0000000..c29f1db
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/PojoValueField.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+import org.w3c.dom.Element;
+
+/**
+ * Descriptor for Ignite value POJO class
+ */
+public class PojoValueField extends PojoField {
+ /** Xml attribute specifying that Cassandra column is static. */
+ private static final String STATIC_ATTR = "static";
+
+ /** Xml attribute specifying that secondary index should be created for Cassandra column. */
+ private static final String INDEX_ATTR = "index";
+
+ /** Xml attribute specifying secondary index custom class. */
+ private static final String INDEX_CLASS_ATTR = "indexClass";
+
+ /** Xml attribute specifying secondary index options. */
+ private static final String INDEX_OPTIONS_ATTR = "indexOptions";
+
+ /** Indicates if Cassandra column should be indexed. */
+ private Boolean isIndexed;
+
+ /** Custom java class for Cassandra secondary index. */
+ private String idxCls;
+
+ /** Secondary index options. */
+ private String idxOptions;
+
+ /** Indicates if Cassandra column is static. */
+ private Boolean isStatic;
+
+ /**
+ * Constructs Ignite cache value field descriptor.
+ *
+ * @param el field descriptor xml configuration element.
+ * @param pojoCls field java class
+ */
+ public PojoValueField(Element el, Class pojoCls) {
+ super(el, pojoCls);
+
+ if (el.hasAttribute(STATIC_ATTR))
+ isStatic = Boolean.parseBoolean(el.getAttribute(STATIC_ATTR).trim().toLowerCase());
+
+ if (el.hasAttribute(INDEX_ATTR))
+ isIndexed = Boolean.parseBoolean(el.getAttribute(INDEX_ATTR).trim().toLowerCase());
+
+ if (el.hasAttribute(INDEX_CLASS_ATTR))
+ idxCls = el.getAttribute(INDEX_CLASS_ATTR).trim();
+
+ if (el.hasAttribute(INDEX_OPTIONS_ATTR)) {
+ idxOptions = el.getAttribute(INDEX_OPTIONS_ATTR).trim();
+
+ if (!idxOptions.toLowerCase().startsWith("with")) {
+ idxOptions = idxOptions.toLowerCase().startsWith("options") ?
+ "with " + idxOptions :
+ "with options = " + idxOptions;
+ }
+ }
+ }
+
+ /**
+ * Constructs Ignite cache value field descriptor.
+ *
+ * @param desc field property descriptor.
+ */
+ public PojoValueField(PropertyDescriptor desc) {
+ super(desc);
+ }
+
+ /**
+ * Returns DDL for Cassandra columns corresponding to POJO field.
+ *
+ * @return columns DDL.
+ */
+ public String getColumnDDL() {
+ String colDDL = super.getColumnDDL();
+
+ if (isStatic != null && isStatic)
+ colDDL = colDDL + " static";
+
+ return colDDL;
+ }
+
+ /**
+ * Indicates if secondary index should be created for the field.
+ *
+ * @return true/false if secondary index should/shouldn't be created for the field.
+ */
+ public boolean isIndexed() {
+ return isIndexed != null && isIndexed;
+ }
+
+ /**
+ * Returns DDL for the field secondary index.
+ *
+ * @param keyspace Cassandra keyspace where index should be created.
+ * @param tbl Cassandra table for which secondary index should be created.
+ *
+ * @return secondary index DDL.
+ */
+ public String getIndexDDL(String keyspace, String tbl) {
+ if (isIndexed == null || !isIndexed)
+ return null;
+
+ StringBuilder builder = new StringBuilder();
+
+ if (idxCls != null)
+ builder.append("create custom index if not exists on ").append(keyspace).append(".").append(tbl);
+ else
+ builder.append("create index if not exists on ").append(keyspace).append(".").append(tbl);
+
+ builder.append(" (").append(getColumn()).append(")");
+
+ if (idxCls != null)
+ builder.append(" using '").append(idxCls).append("'");
+
+ if (idxOptions != null)
+ builder.append(" ").append(idxOptions);
+
+ return builder.append(";").toString();
+ }
+
+ /**
+ * Initializes descriptor from {@link QuerySqlField} annotation.
+ *
+ * @param sqlField {@link QuerySqlField} annotation.
+ */
+ protected void init(QuerySqlField sqlField) {
+ if (sqlField.index())
+ isIndexed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
new file mode 100644
index 0000000..877167d
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/ValuePersistenceSettings.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.persistence;
+
+import java.beans.PropertyDescriptor;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * Stores persistence settings for Ignite cache value
+ */
+public class ValuePersistenceSettings extends PersistenceSettings {
+ /** XML element describing value field settings. */
+ private static final String FIELD_ELEMENT = "field";
+
+ /** Value fields. */
+ private List<PojoField> fields = new LinkedList<>();
+
+ /**
+ * Creates class instance from XML configuration.
+ *
+ * @param el XML element describing value persistence settings.
+ */
+ public ValuePersistenceSettings(Element el) {
+ super(el);
+
+ if (!PersistenceStrategy.POJO.equals(getStrategy()))
+ return;
+
+ NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
+
+ fields = detectFields(nodes);
+
+ if (fields.isEmpty())
+ throw new IllegalStateException("Failed to initialize value fields for class '" + getJavaClass().getName() + "'");
+
+ checkDuplicates(fields);
+ }
+
+ /**
+ * @return List of value fields.
+ */
+ public List<PojoField> getFields() {
+ return fields == null ? null : Collections.unmodifiableList(fields);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String defaultColumnName() {
+ return "value";
+ }
+
+ /**
+ * Extracts POJO fields from a list of corresponding XML field nodes.
+ *
+ * @param fieldNodes Field nodes to process.
+ * @return POJO fields list.
+ */
+ private List<PojoField> detectFields(NodeList fieldNodes) {
+ List<PojoField> list = new LinkedList<>();
+
+ if (fieldNodes == null || fieldNodes.getLength() == 0) {
+ List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
+ for (PropertyDescriptor descriptor : primitivePropDescriptors)
+ list.add(new PojoValueField(descriptor));
+
+ return list;
+ }
+
+ List<PropertyDescriptor> allPropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), false);
+
+ int cnt = fieldNodes.getLength();
+
+ for (int i = 0; i < cnt; i++) {
+ PojoValueField field = new PojoValueField((Element)fieldNodes.item(i), getJavaClass());
+
+ PropertyDescriptor desc = findPropertyDescriptor(allPropDescriptors, field.getName());
+
+ if (desc == null) {
+ throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
+ "' doesn't exist in '" + getJavaClass().getName() + "' class");
+ }
+
+ list.add(field);
+ }
+
+ return list;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
new file mode 100644
index 0000000..76d32fb
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains persistent settings configuration
+ */
+package org.apache.ignite.cache.store.cassandra.persistence;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
new file mode 100644
index 0000000..44d2d47
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/JavaSerializer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Serializer based on standard Java serialization.
+ */
+public class JavaSerializer implements Serializer {
+ /** */
+ private static final int DFLT_BUFFER_SIZE = 4096;
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer serialize(Object obj) {
+ if (obj == null)
+ return null;
+
+ ByteArrayOutputStream stream = null;
+ ObjectOutputStream out = null;
+
+ try {
+ stream = new ByteArrayOutputStream(DFLT_BUFFER_SIZE);
+
+ out = new ObjectOutputStream(stream);
+ out.writeObject(obj);
+ out.flush();
+
+ return ByteBuffer.wrap(stream.toByteArray());
+ }
+ catch (IOException e) {
+ throw new IllegalStateException("Failed to serialize object of the class '" + obj.getClass().getName() + "'", e);
+ }
+ finally {
+ U.closeQuiet(out);
+ U.closeQuiet(stream);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object deserialize(ByteBuffer buf) {
+ ByteArrayInputStream stream = null;
+ ObjectInputStream in = null;
+
+ try {
+ stream = new ByteArrayInputStream(buf.array());
+ in = new ObjectInputStream(stream);
+
+ return in.readObject();
+ }
+ catch (Throwable e) {
+ throw new IllegalStateException("Failed to deserialize object from byte stream", e);
+ }
+ finally {
+ U.closeQuiet(in);
+ U.closeQuiet(stream);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
new file mode 100644
index 0000000..5b8d542
--- /dev/null
+++ b/modules/cassandra/store/src/main/java/org/apache/ignite/cache/store/cassandra/serializer/Serializer.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store.cassandra.serializer;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Interface which should be implemented by all serializers responsible
+ * for writing/loading data to/from Cassandra in binary (BLOB) format.
+ */
+public interface Serializer extends Serializable {
+ /**
+ * Serializes object into byte buffer.
+ *
+ * @param obj Object to serialize.
+ * @return Byte buffer with binary data.
+ */
+ public ByteBuffer serialize(Object obj);
+
+ /**
+ * Deserializes object from byte buffer.
+ *
+ * @param buf Byte buffer.
+ * @return Deserialized object.
+ */
+ public Object deserialize(ByteBuffer buf);
+}