You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/14 01:38:04 UTC
[18/20] ignite git commit: IGNITE-3172 Refactoring Ignite-Cassandra
serializers. - Fixes #956.
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
deleted file mode 100644
index 7584dfb..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/CassandraCacheStoreFactory.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra;
-
-import javax.cache.configuration.Factory;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
-import org.apache.ignite.cache.store.cassandra.persistence.KeyValuePersistenceSettings;
-import org.apache.ignite.internal.IgniteComponentType;
-import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
-import org.apache.ignite.resources.SpringApplicationContextResource;
-
-/**
- * Factory class to instantiate {@link CassandraCacheStore}.
- *
- * @param <K> Ignite cache key type
- * @param <V> Ignite cache value type
- */
-public class CassandraCacheStoreFactory<K, V> implements Factory<CassandraCacheStore<K, V>> {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** Auto-injected Spring ApplicationContext resource. */
- @SpringApplicationContextResource
- private Object appCtx;
-
- /** Name of data source bean. */
- private String dataSrcBean;
-
- /** Name of persistence settings bean. */
- private String persistenceSettingsBean;
-
- /** Data source. */
- private transient DataSource dataSrc;
-
- /** Persistence settings. */
- private KeyValuePersistenceSettings persistenceSettings;
-
- /** Max workers thread count. These threads are responsible for load cache. */
- private int maxPoolSize = Runtime.getRuntime().availableProcessors();
-
- /** {@inheritDoc} */
- @Override public CassandraCacheStore<K, V> create() {
- return new CassandraCacheStore<>(getDataSource(), getPersistenceSettings(), getMaxPoolSize());
- }
-
- /**
- * Sets data source.
- *
- * @param dataSrc Data source.
- *
- * @return {@code This} for chaining.
- */
- @SuppressWarnings("UnusedDeclaration")
- public CassandraCacheStoreFactory<K, V> setDataSource(DataSource dataSrc) {
- this.dataSrc = dataSrc;
-
- return this;
- }
-
- /**
- * Sets data source bean name.
- *
- * @param beanName Data source bean name.
- * @return {@code This} for chaining.
- */
- public CassandraCacheStoreFactory<K, V> setDataSourceBean(String beanName) {
- this.dataSrcBean = beanName;
-
- return this;
- }
-
- /**
- * Sets persistence settings.
- *
- * @param settings Persistence settings.
- * @return {@code This} for chaining.
- */
- @SuppressWarnings("UnusedDeclaration")
- public CassandraCacheStoreFactory<K, V> setPersistenceSettings(KeyValuePersistenceSettings settings) {
- this.persistenceSettings = settings;
-
- return this;
- }
-
- /**
- * Sets persistence settings bean name.
- *
- * @param beanName Persistence settings bean name.
- * @return {@code This} for chaining.
- */
- public CassandraCacheStoreFactory<K, V> setPersistenceSettingsBean(String beanName) {
- this.persistenceSettingsBean = beanName;
-
- return this;
- }
-
- /**
- * @return Data source.
- */
- private DataSource getDataSource() {
- if (dataSrc != null)
- return dataSrc;
-
- if (dataSrcBean == null)
- throw new IllegalStateException("Either DataSource bean or DataSource itself should be specified");
-
- if (appCtx == null) {
- throw new IllegalStateException("Failed to get Cassandra DataSource cause Spring application " +
- "context wasn't injected into CassandraCacheStoreFactory");
- }
-
- Object obj = loadSpringContextBean(appCtx, dataSrcBean);
-
- if (!(obj instanceof DataSource))
- throw new IllegalStateException("Incorrect connection bean '" + dataSrcBean + "' specified");
-
- return dataSrc = (DataSource)obj;
- }
-
- /**
- * @return Persistence settings.
- */
- private KeyValuePersistenceSettings getPersistenceSettings() {
- if (persistenceSettings != null)
- return persistenceSettings;
-
- if (persistenceSettingsBean == null) {
- throw new IllegalStateException("Either persistence settings bean or persistence settings itself " +
- "should be specified");
- }
-
- if (appCtx == null) {
- throw new IllegalStateException("Failed to get Cassandra persistence settings cause Spring application " +
- "context wasn't injected into CassandraCacheStoreFactory");
- }
-
- Object obj = loadSpringContextBean(appCtx, persistenceSettingsBean);
-
- if (!(obj instanceof KeyValuePersistenceSettings)) {
- throw new IllegalStateException("Incorrect persistence settings bean '" +
- persistenceSettingsBean + "' specified");
- }
-
- return persistenceSettings = (KeyValuePersistenceSettings)obj;
- }
-
- /**
- * Get maximum workers thread count. These threads are responsible for queries execution.
- *
- * @return Maximum workers thread count.
- */
- public int getMaxPoolSize() {
- return maxPoolSize;
- }
-
- /**
- * Set Maximum workers thread count. These threads are responsible for queries execution.
- *
- * @param maxPoolSize Max workers thread count.
- * @return {@code This} for chaining.
- */
- public CassandraCacheStoreFactory<K, V> setMaxPoolSize(int maxPoolSize) {
- this.maxPoolSize = maxPoolSize;
-
- return this;
- }
-
- /**
- * Loads bean from Spring ApplicationContext.
- *
- * @param appCtx Application context.
- * @param beanName Bean name to load.
- * @return Loaded bean.
- */
- private Object loadSpringContextBean(Object appCtx, String beanName) {
- try {
- IgniteSpringHelper spring = IgniteComponentType.SPRING.create(false);
- return spring.loadBeanFromAppContext(appCtx, beanName);
- }
- catch (Exception e) {
- throw new IgniteException("Failed to load bean in application context [beanName=" + beanName + ", igniteConfig=" + appCtx + ']', e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
deleted file mode 100644
index d3bff7f..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/CassandraHelper.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.common;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.datastax.driver.core.exceptions.ReadTimeoutException;
-import java.util.regex.Pattern;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Helper class providing methods to work with Cassandra session and exceptions
- */
-public class CassandraHelper {
- /** Cassandra error message if specified keyspace doesn't exist. */
- private static final Pattern KEYSPACE_EXIST_ERROR1 = Pattern.compile("Keyspace [0-9a-zA-Z_]+ does not exist");
-
- /** Cassandra error message if trying to create table inside nonexistent keyspace. */
- private static final Pattern KEYSPACE_EXIST_ERROR2 = Pattern.compile("Cannot add table '[0-9a-zA-Z_]+' to non existing keyspace.*");
-
- /** Cassandra error message if specified table doesn't exist. */
- private static final Pattern TABLE_EXIST_ERROR = Pattern.compile("unconfigured table [0-9a-zA-Z_]+");
-
- /** Cassandra error message if trying to use prepared statement created from another session. */
- private static final String PREP_STATEMENT_CLUSTER_INSTANCE_ERROR = "You may have used a PreparedStatement that " +
- "was created with another Cluster instance";
-
- /** Closes Cassandra driver session. */
- public static void closeSession(Session driverSes) {
- if (driverSes == null)
- return;
-
- Cluster cluster = driverSes.getCluster();
-
- if (!driverSes.isClosed())
- U.closeQuiet(driverSes);
-
- if (!cluster.isClosed())
- U.closeQuiet(cluster);
- }
-
- /**
- * Checks if Cassandra keyspace absence error occur.
- *
- * @param e Exception to check.
- * @return {@code true} in case of keyspace absence error.
- */
- public static boolean isKeyspaceAbsenceError(Throwable e) {
- while (e != null) {
- if (e instanceof InvalidQueryException &&
- (KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
- KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
- return true;
-
- e = e.getCause();
- }
-
- return false;
- }
-
- /**
- * Checks if Cassandra table absence error occur.
- *
- * @param e Exception to check.
- * @return {@code true} in case of table absence error.
- */
- public static boolean isTableAbsenceError(Throwable e) {
- while (e != null) {
- if (e instanceof InvalidQueryException &&
- (TABLE_EXIST_ERROR.matcher(e.getMessage()).matches() ||
- KEYSPACE_EXIST_ERROR1.matcher(e.getMessage()).matches() ||
- KEYSPACE_EXIST_ERROR2.matcher(e.getMessage()).matches()))
- return true;
-
- e = e.getCause();
- }
-
- return false;
- }
-
- /**
- * Checks if Cassandra host availability error occur, thus host became unavailable.
- *
- * @param e Exception to check.
- * @return {@code true} in case of host not available error.
- */
- public static boolean isHostsAvailabilityError(Throwable e) {
- while (e != null) {
- if (e instanceof NoHostAvailableException ||
- e instanceof ReadTimeoutException)
- return true;
-
- e = e.getCause();
- }
-
- return false;
- }
-
- /**
- * Checks if Cassandra error occur because of prepared statement created in one session was used in another session.
- *
- * @param e Exception to check.
- * @return {@code true} in case of invalid usage of prepared statement.
- */
- public static boolean isPreparedStatementClusterError(Throwable e) {
- while (e != null) {
- if (e instanceof InvalidQueryException && e.getMessage().contains(PREP_STATEMENT_CLUSTER_INSTANCE_ERROR))
- return true;
-
- e = e.getCause();
- }
-
- return false;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
deleted file mode 100644
index 9053a93..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/PropertyMappingHelper.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.common;
-
-import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.Row;
-import java.beans.PropertyDescriptor;
-import java.lang.annotation.Annotation;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import org.apache.commons.beanutils.PropertyUtils;
-import org.apache.ignite.cache.store.cassandra.serializer.Serializer;
-
-/**
- * Helper class providing bunch of methods to discover fields of POJO objects and
- * map builtin Java types to appropriate Cassandra types.
- */
-public class PropertyMappingHelper {
- /** Bytes array Class type. */
- private static final Class BYTES_ARRAY_CLASS = (new byte[] {}).getClass();
-
- /** Mapping from Java to Cassandra types. */
- private static final Map<Class, DataType.Name> JAVA_TO_CASSANDRA_MAPPING = new HashMap<Class, DataType.Name>() {{
- put(String.class, DataType.Name.TEXT);
- put(Integer.class, DataType.Name.INT);
- put(int.class, DataType.Name.INT);
- put(Short.class, DataType.Name.INT);
- put(short.class, DataType.Name.INT);
- put(Long.class, DataType.Name.BIGINT);
- put(long.class, DataType.Name.BIGINT);
- put(Double.class, DataType.Name.DOUBLE);
- put(double.class, DataType.Name.DOUBLE);
- put(Boolean.class, DataType.Name.BOOLEAN);
- put(boolean.class, DataType.Name.BOOLEAN);
- put(Float.class, DataType.Name.FLOAT);
- put(float.class, DataType.Name.FLOAT);
- put(ByteBuffer.class, DataType.Name.BLOB);
- put(BYTES_ARRAY_CLASS, DataType.Name.BLOB);
- put(BigDecimal.class, DataType.Name.DECIMAL);
- put(InetAddress.class, DataType.Name.INET);
- put(Date.class, DataType.Name.TIMESTAMP);
- put(UUID.class, DataType.Name.UUID);
- put(BigInteger.class, DataType.Name.VARINT);
- }};
-
- /**
- * Maps Cassandra type to specified Java type.
- *
- * @param clazz java class.
- *
- * @return Cassandra type.
- */
- public static DataType.Name getCassandraType(Class clazz) {
- return JAVA_TO_CASSANDRA_MAPPING.get(clazz);
- }
-
- /**
- * Returns property descriptor by class property name.
- *
- * @param clazz class from which to get property descriptor.
- * @param prop name of the property.
- *
- * @return property descriptor.
- */
- public static PropertyDescriptor getPojoPropertyDescriptor(Class clazz, String prop) {
- List<PropertyDescriptor> descriptors = getPojoPropertyDescriptors(clazz, false);
-
- if (descriptors == null || descriptors.isEmpty())
- throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property");
-
- for (PropertyDescriptor descriptor : descriptors) {
- if (descriptor.getName().equals(prop))
- return descriptor;
- }
-
- throw new IllegalArgumentException("POJO class " + clazz.getName() + " doesn't have '" + prop + "' property");
- }
-
- /**
- * Extracts all property descriptors from a class.
- *
- * @param clazz class which property descriptors should be extracted.
- * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted.
- *
- * @return list of class property descriptors
- */
- public static List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz, boolean primitive) {
- return getPojoPropertyDescriptors(clazz, null, primitive);
- }
-
- /**
- * Extracts all property descriptors having specific annotation from a class.
- *
- * @param clazz class which property descriptors should be extracted.
- * @param annotation annotation to look for.
- * @param primitive boolean flag indicating that only property descriptors for primitive properties should be extracted.
- *
- * @return list of class property descriptors
- */
- public static <T extends Annotation> List<PropertyDescriptor> getPojoPropertyDescriptors(Class clazz,
- Class<T> annotation, boolean primitive) {
- PropertyDescriptor[] descriptors = PropertyUtils.getPropertyDescriptors(clazz);
-
- List<PropertyDescriptor> list = new ArrayList<>(descriptors == null ? 1 : descriptors.length);
-
- if (descriptors == null || descriptors.length == 0)
- return list;
-
- for (PropertyDescriptor descriptor : descriptors) {
- if (descriptor.getReadMethod() == null || descriptor.getWriteMethod() == null ||
- (primitive && !isPrimitivePropertyDescriptor(descriptor)))
- continue;
-
- if (annotation == null || descriptor.getReadMethod().getAnnotation(annotation) != null)
- list.add(descriptor);
- }
-
- return list;
- }
-
- /**
- * Checks if property descriptor describes primitive property (int, boolean, long and etc.)
- *
- * @param desc property descriptor.
- *
- * @return {@code true} property is primitive
- */
- public static boolean isPrimitivePropertyDescriptor(PropertyDescriptor desc) {
- return PropertyMappingHelper.JAVA_TO_CASSANDRA_MAPPING.containsKey(desc.getPropertyType());
- }
-
- /**
- * Returns value of specific column in the row returned by CQL statement.
- *
- * @param row row returned by CQL statement.
- * @param col column name.
- * @param clazz java class to which column value should be casted.
- * @param serializer serializer to use if column stores BLOB otherwise could be null.
- *
- * @return row column value.
- */
- public static Object getCassandraColumnValue(Row row, String col, Class clazz, Serializer serializer) {
- if (String.class.equals(clazz))
- return row.getString(col);
-
- if (Integer.class.equals(clazz) || int.class.equals(clazz))
- return row.getInt(col);
-
- if (Short.class.equals(clazz) || short.class.equals(clazz))
- return (short)row.getInt(col);
-
- if (Long.class.equals(clazz) || long.class.equals(clazz))
- return row.getLong(col);
-
- if (Double.class.equals(clazz) || double.class.equals(clazz))
- return row.getDouble(col);
-
- if (Boolean.class.equals(clazz) || boolean.class.equals(clazz))
- return row.getBool(col);
-
- if (Float.class.equals(clazz) || float.class.equals(clazz))
- return row.getFloat(col);
-
- if (ByteBuffer.class.equals(clazz))
- return row.getBytes(col);
-
- if (PropertyMappingHelper.BYTES_ARRAY_CLASS.equals(clazz)) {
- ByteBuffer buf = row.getBytes(col);
-
- return buf == null ? null : buf.array();
- }
-
- if (BigDecimal.class.equals(clazz))
- return row.getDecimal(col);
-
- if (InetAddress.class.equals(clazz))
- return row.getInet(col);
-
- if (Date.class.equals(clazz))
- return row.getTimestamp(col);
-
- if (UUID.class.equals(clazz))
- return row.getUUID(col);
-
- if (BigInteger.class.equals(clazz))
- return row.getVarint(col);
-
- if (serializer == null) {
- throw new IllegalStateException("Can't deserialize value from '" + col + "' Cassandra column, " +
- "cause there is no BLOB serializer specified");
- }
-
- ByteBuffer buf = row.getBytes(col);
-
- return buf == null ? null : serializer.deserialize(buf);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
deleted file mode 100644
index 6745a16..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/RandomSleeper.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.common;
-
-import java.util.Random;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-
-/**
- * Provides sleep method with randomly selected sleep time from specified range and
- * incrementally shifts sleep time range for each next sleep attempt
- *
- */
-public class RandomSleeper {
- /** */
- private int min;
-
- /** */
- private int max;
-
- /** */
- private int incr;
-
- /** */
- private IgniteLogger log;
-
- /** */
- private Random random = new Random(System.currentTimeMillis());
-
- /** */
- private int summary = 0;
-
- /**
- * Creates sleeper instance.
- *
- * @param min minimum sleep time (in milliseconds)
- * @param max maximum sleep time (in milliseconds)
- * @param incr time range shift increment (in milliseconds)
- */
- public RandomSleeper(int min, int max, int incr, IgniteLogger log) {
- if (min <= 0)
- throw new IllegalArgumentException("Incorrect min time specified: " + min);
-
- if (max <= min)
- throw new IllegalArgumentException("Incorrect max time specified: " + max);
-
- if (incr < 10)
- throw new IllegalArgumentException("Incorrect increment specified: " + incr);
-
- this.min = min;
- this.max = max;
- this.incr = incr;
- this.log = log;
- }
-
- /**
- * Sleeps
- */
- public void sleep() {
- try {
- int timeout = random.nextInt(max - min + 1) + min;
-
- if (log != null)
- log.info("Sleeping for " + timeout + "ms");
-
- Thread.sleep(timeout);
-
- summary += timeout;
-
- if (log != null)
- log.info("Sleep completed");
- }
- catch (InterruptedException e) {
- throw new IgniteException("Random sleep interrupted", e);
- }
-
- min += incr;
- max += incr;
- }
-
- /**
- * Returns summary sleep time.
- *
- * @return Summary sleep time in milliseconds.
- */
- public int getSleepSummary() {
- return summary;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
deleted file mode 100644
index 5d51488..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/SystemHelper.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.common;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-
-/**
- * Helper class providing system information about the host (ip, hostname, os and etc.)
- */
-public class SystemHelper {
- /** System line separator. */
- public static final String LINE_SEPARATOR = System.getProperty("line.separator");
-
- /** Host name. */
- public static final String HOST_NAME;
-
- /** Host IP address */
- public static final String HOST_IP;
-
- static {
- try {
- InetAddress addr = InetAddress.getLocalHost();
- HOST_NAME = addr.getHostName();
- HOST_IP = addr.getHostAddress();
- }
- catch (UnknownHostException e) {
- throw new IllegalStateException("Failed to get host/ip of current computer", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
deleted file mode 100644
index c4f5d3b..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/common/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains commonly used helper classes
- */
-package org.apache.ignite.cache.store.cassandra.common;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
deleted file mode 100644
index a2358a6..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.datasource;
-
-import java.io.Serializable;
-
-/**
- * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML).
- */
-public interface Credentials extends Serializable {
- /**
- * Returns user name
- *
- * @return user name
- */
- public String getUser();
-
- /**
- * Returns password
- *
- * @return password
- */
- public String getPassword();
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
deleted file mode 100644
index f582aac..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ /dev/null
@@ -1,647 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.datasource;
-
-import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.NettyOptions;
-import com.datastax.driver.core.PoolingOptions;
-import com.datastax.driver.core.ProtocolOptions;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.SSLOptions;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.policies.AddressTranslator;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.datastax.driver.core.policies.ReconnectionPolicy;
-import com.datastax.driver.core.policies.RetryPolicy;
-import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
-
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.UUID;
-
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
-import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
-/**
- * Data source abstraction to specify configuration of the Cassandra session to be used.
- */
-public class DataSource implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /**
- * Null object, used as a replacement for those Cassandra connection options which
- * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
- */
- private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
-
- /** Number of rows to immediately fetch in CQL statement execution. */
- private Integer fetchSize;
-
- /** Consistency level for READ operations. */
- private ConsistencyLevel readConsistency;
-
- /** Consistency level for WRITE operations. */
- private ConsistencyLevel writeConsistency;
-
- /** Username to use for authentication. */
- private String user;
-
- /** Password to use for authentication. */
- private String pwd;
-
- /** Port to use for Cassandra connection. */
- private Integer port;
-
- /** List of contact points to connect to Cassandra cluster. */
- private List<InetAddress> contactPoints;
-
- /** List of contact points with ports to connect to Cassandra cluster. */
- private List<InetSocketAddress> contactPointsWithPorts;
-
- /** Maximum time to wait for schema agreement before returning from a DDL query. */
- private Integer maxSchemaAgreementWaitSeconds;
-
- /** The native protocol version to use. */
- private Integer protoVer;
-
- /** Compression to use for the transport. */
- private String compression;
-
- /** Use SSL for communications with Cassandra. */
- private Boolean useSSL;
-
- /** Enables metrics collection. */
- private Boolean collectMetrix;
-
- /** Enables JMX reporting of the metrics. */
- private Boolean jmxReporting;
-
- /** Credentials to use for authentication. */
- private Credentials creds;
-
- /** Load balancing policy to use. */
- private LoadBalancingPolicy loadBalancingPlc;
-
- /** Reconnection policy to use. */
- private ReconnectionPolicy reconnectionPlc;
-
- /** Retry policy to use. */
- private RetryPolicy retryPlc;
-
- /** Address translator to use. */
- private AddressTranslator addrTranslator;
-
- /** Speculative execution policy to use. */
- private SpeculativeExecutionPolicy speculativeExecutionPlc;
-
- /** Authentication provider to use. */
- private AuthProvider authProvider;
-
- /** SSL options to use. */
- private SSLOptions sslOptions;
-
- /** Connection pooling options to use. */
- private PoolingOptions poolingOptions;
-
- /** Socket options to use. */
- private SocketOptions sockOptions;
-
- /** Netty options to use for connection. */
- private NettyOptions nettyOptions;
-
- /** Cassandra session wrapper instance. */
- private volatile CassandraSession ses;
-
- /**
- * Sets user name to use for authentication.
- *
- * @param user user name
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setUser(String user) {
- this.user = user;
-
- invalidate();
- }
-
- /**
- * Sets password to use for authentication.
- *
- * @param pwd password
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setPassword(String pwd) {
- this.pwd = pwd;
-
- invalidate();
- }
-
- /**
- * Sets port to use for Cassandra connection.
- *
- * @param port port
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setPort(int port) {
- this.port = port;
-
- invalidate();
- }
-
- /**
- * Sets list of contact points to connect to Cassandra cluster.
- *
- * @param points contact points
- */
- public void setContactPoints(String... points) {
- if (points == null || points.length == 0)
- return;
-
- for (String point : points) {
- if (point.contains(":")) {
- if (contactPointsWithPorts == null)
- contactPointsWithPorts = new LinkedList<>();
-
- String[] chunks = point.split(":");
-
- try {
- contactPointsWithPorts.add(InetSocketAddress.createUnresolved(chunks[0].trim(), Integer.parseInt(chunks[1].trim())));
- }
- catch (Throwable e) {
- throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
- }
- }
- else {
- if (contactPoints == null)
- contactPoints = new LinkedList<>();
-
- try {
- contactPoints.add(InetAddress.getByName(point));
- }
- catch (Throwable e) {
- throw new IllegalArgumentException("Incorrect contact point '" + point + "' specified for Cassandra cache storage", e);
- }
- }
- }
-
- invalidate();
- }
-
- /** Sets maximum time to wait for schema agreement before returning from a DDL query. */
- @SuppressWarnings("UnusedDeclaration")
- public void setMaxSchemaAgreementWaitSeconds(int seconds) {
- maxSchemaAgreementWaitSeconds = seconds;
-
- invalidate();
- }
-
- /**
- * Sets the native protocol version to use.
- *
- * @param ver version number
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setProtocolVersion(int ver) {
- protoVer = ver;
-
- invalidate();
- }
-
- /**
- * Sets compression algorithm to use for the transport.
- *
- * @param compression Compression algorithm.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setCompression(String compression) {
- this.compression = compression == null || compression.trim().isEmpty() ? null : compression.trim();
-
- try {
- if (this.compression != null)
- ProtocolOptions.Compression.valueOf(this.compression);
- }
- catch (Throwable e) {
- throw new IgniteException("Incorrect compression '" + compression + "' specified for Cassandra connection", e);
- }
-
- invalidate();
- }
-
- /**
- * Enables SSL for communications with Cassandra.
- *
- * @param use Flag to enable/disable SSL.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setUseSSL(boolean use) {
- useSSL = use;
-
- invalidate();
- }
-
- /**
- * Enables metrics collection.
- *
- * @param collect Flag to enable/disable metrics collection.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setCollectMetrix(boolean collect) {
- collectMetrix = collect;
-
- invalidate();
- }
-
- /**
- * Enables JMX reporting of the metrics.
- *
- * @param enableReporting Flag to enable/disable JMX reporting.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setJmxReporting(boolean enableReporting) {
- jmxReporting = enableReporting;
-
- invalidate();
- }
-
- /**
- * Sets number of rows to immediately fetch in CQL statement execution.
- *
- * @param size Number of rows to fetch.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setFetchSize(int size) {
- fetchSize = size;
-
- invalidate();
- }
-
- /**
- * Set consistency level for READ operations.
- *
- * @param level Consistency level.
- */
- public void setReadConsistency(String level) {
- readConsistency = parseConsistencyLevel(level);
-
- invalidate();
- }
-
- /**
- * Set consistency level for WRITE operations.
- *
- * @param level Consistency level.
- */
- public void setWriteConsistency(String level) {
- writeConsistency = parseConsistencyLevel(level);
-
- invalidate();
- }
-
- /**
- * Sets credentials to use for authentication.
- *
- * @param creds Credentials.
- */
- public void setCredentials(Credentials creds) {
- this.creds = creds;
-
- invalidate();
- }
-
- /**
- * Sets load balancing policy.
- *
- * @param plc Load balancing policy.
- */
- public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
- loadBalancingPlc = plc;
-
- invalidate();
- }
-
- /**
- * Sets reconnection policy.
- *
- * @param plc Reconnection policy.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setReconnectionPolicy(ReconnectionPolicy plc) {
- reconnectionPlc = plc;
-
- invalidate();
- }
-
- /**
- * Sets retry policy.
- *
- * @param plc Retry policy.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setRetryPolicy(RetryPolicy plc) {
- retryPlc = plc;
-
- invalidate();
- }
-
- /**
- * Sets address translator.
- *
- * @param translator Address translator.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setAddressTranslator(AddressTranslator translator) {
- addrTranslator = translator;
-
- invalidate();
- }
-
- /**
- * Sets speculative execution policy.
- *
- * @param plc Speculative execution policy.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
- speculativeExecutionPlc = plc;
-
- invalidate();
- }
-
- /**
- * Sets authentication provider.
- *
- * @param provider Authentication provider.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setAuthProvider(AuthProvider provider) {
- authProvider = provider;
-
- invalidate();
- }
-
- /**
- * Sets SSL options.
- *
- * @param options SSL options.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setSslOptions(SSLOptions options) {
- sslOptions = options;
-
- invalidate();
- }
-
- /**
- * Sets pooling options.
- *
- * @param options pooling options to use.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setPoolingOptions(PoolingOptions options) {
- poolingOptions = options;
-
- invalidate();
- }
-
- /**
- * Sets socket options to use.
- *
- * @param options Socket options.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setSocketOptions(SocketOptions options) {
- sockOptions = options;
-
- invalidate();
- }
-
- /**
- * Sets netty options to use.
- *
- * @param options netty options.
- */
- @SuppressWarnings("UnusedDeclaration")
- public void setNettyOptions(NettyOptions options) {
- nettyOptions = options;
-
- invalidate();
- }
-
- /**
- * Creates Cassandra session wrapper if it wasn't created yet and returns it
- *
- * @param log logger
- * @return Cassandra session wrapper
- */
- @SuppressWarnings("deprecation")
- public synchronized CassandraSession session(IgniteLogger log) {
- if (ses != null)
- return ses;
-
- Cluster.Builder builder = Cluster.builder();
-
- if (user != null)
- builder = builder.withCredentials(user, pwd);
-
- if (port != null)
- builder = builder.withPort(port);
-
- if (contactPoints != null)
- builder = builder.addContactPoints(contactPoints);
-
- if (contactPointsWithPorts != null)
- builder = builder.addContactPointsWithPorts(contactPointsWithPorts);
-
- if (maxSchemaAgreementWaitSeconds != null)
- builder = builder.withMaxSchemaAgreementWaitSeconds(maxSchemaAgreementWaitSeconds);
-
- if (protoVer != null)
- builder = builder.withProtocolVersion(ProtocolVersion.fromInt(protoVer));
-
- if (compression != null) {
- try {
- builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compression.trim().toLowerCase()));
- }
- catch (IllegalArgumentException e) {
- throw new IgniteException("Incorrect compression option '" + compression + "' specified for Cassandra connection", e);
- }
- }
-
- if (useSSL != null && useSSL)
- builder = builder.withSSL();
-
- if (sslOptions != null)
- builder = builder.withSSL(sslOptions);
-
- if (collectMetrix != null && !collectMetrix)
- builder = builder.withoutMetrics();
-
- if (jmxReporting != null && !jmxReporting)
- builder = builder.withoutJMXReporting();
-
- if (creds != null)
- builder = builder.withCredentials(creds.getUser(), creds.getPassword());
-
- if (loadBalancingPlc != null)
- builder = builder.withLoadBalancingPolicy(loadBalancingPlc);
-
- if (reconnectionPlc != null)
- builder = builder.withReconnectionPolicy(reconnectionPlc);
-
- if (retryPlc != null)
- builder = builder.withRetryPolicy(retryPlc);
-
- if (addrTranslator != null)
- builder = builder.withAddressTranslator(addrTranslator);
-
- if (speculativeExecutionPlc != null)
- builder = builder.withSpeculativeExecutionPolicy(speculativeExecutionPlc);
-
- if (authProvider != null)
- builder = builder.withAuthProvider(authProvider);
-
- if (poolingOptions != null)
- builder = builder.withPoolingOptions(poolingOptions);
-
- if (sockOptions != null)
- builder = builder.withSocketOptions(sockOptions);
-
- if (nettyOptions != null)
- builder = builder.withNettyOptions(nettyOptions);
-
- return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(fetchSize);
- out.writeObject(readConsistency);
- out.writeObject(writeConsistency);
- U.writeString(out, user);
- U.writeString(out, pwd);
- out.writeObject(port);
- out.writeObject(contactPoints);
- out.writeObject(contactPointsWithPorts);
- out.writeObject(maxSchemaAgreementWaitSeconds);
- out.writeObject(protoVer);
- U.writeString(out, compression);
- out.writeObject(useSSL);
- out.writeObject(collectMetrix);
- out.writeObject(jmxReporting);
- out.writeObject(creds);
- writeObject(out, loadBalancingPlc);
- writeObject(out, reconnectionPlc);
- writeObject(out, addrTranslator);
- writeObject(out, speculativeExecutionPlc);
- writeObject(out, authProvider);
- writeObject(out, sslOptions);
- writeObject(out, poolingOptions);
- writeObject(out, sockOptions);
- writeObject(out, nettyOptions);
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- fetchSize = (Integer)in.readObject();
- readConsistency = (ConsistencyLevel)in.readObject();
- writeConsistency = (ConsistencyLevel)in.readObject();
- user = U.readString(in);
- pwd = U.readString(in);
- port = (Integer)in.readObject();
- contactPoints = (List<InetAddress>)in.readObject();
- contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
- maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
- protoVer = (Integer)in.readObject();
- compression = U.readString(in);
- useSSL = (Boolean)in.readObject();
- collectMetrix = (Boolean)in.readObject();
- jmxReporting = (Boolean)in.readObject();
- creds = (Credentials)in.readObject();
- loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
- reconnectionPlc = (ReconnectionPolicy)readObject(in);
- addrTranslator = (AddressTranslator)readObject(in);
- speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
- authProvider = (AuthProvider)readObject(in);
- sslOptions = (SSLOptions)readObject(in);
- poolingOptions = (PoolingOptions)readObject(in);
- sockOptions = (SocketOptions)readObject(in);
- nettyOptions = (NettyOptions)readObject(in);
- }
-
- /**
- * Helper method used to serialize class members
- * @param out the stream to write the object to
- * @param obj the object to be written
- * @throws IOException Includes any I/O exceptions that may occur
- */
- private void writeObject(ObjectOutput out, Object obj) throws IOException {
- out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
- }
-
- /**
- * Helper method used to deserialize class members
- * @param in the stream to read data from in order to restore the object
- * @throws IOException Includes any I/O exceptions that may occur
- * @throws ClassNotFoundException If the class for an object being restored cannot be found
- * @return deserialized object
- */
- private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
- Object obj = in.readObject();
- return NULL_OBJECT.equals(obj) ? null : obj;
- }
-
- /**
- * Parses consistency level provided as string.
- *
- * @param level consistency level string.
- *
- * @return consistency level.
- */
- private ConsistencyLevel parseConsistencyLevel(String level) {
- if (level == null)
- return null;
-
- try {
- return ConsistencyLevel.valueOf(level.trim().toUpperCase());
- }
- catch (Throwable e) {
- throw new IgniteException("Incorrect consistency level '" + level + "' specified for Cassandra connection", e);
- }
- }
-
- /**
- * Invalidates session.
- */
- private synchronized void invalidate() {
- ses = null;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
deleted file mode 100644
index 46ebdc5..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.datasource;
-
-/**
- * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values.
- */
-public class PlainCredentials implements Credentials {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** User name. */
- private String user;
-
- /** User password. */
- private String pwd;
-
- /**
- * Creates credentials object.
- *
- * @param user User name.
- * @param pwd User password.
- */
- public PlainCredentials(String user, String pwd) {
- this.user = user;
- this.pwd = pwd;
- }
-
- /** {@inheritDoc} */
- @Override public String getUser() {
- return user;
- }
-
- /** {@inheritDoc} */
- @Override public String getPassword() {
- return pwd;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
deleted file mode 100644
index d5003ae..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains data source implementation
- */
-package org.apache.ignite.cache.store.cassandra.datasource;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
deleted file mode 100644
index 46f5635..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains {@link org.apache.ignite.cache.store.CacheStore} implementation backed by Cassandra database
- */
-package org.apache.ignite.cache.store.cassandra;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
deleted file mode 100644
index 393dbe4..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyPersistenceSettings.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.persistence;
-
-import java.beans.PropertyDescriptor;
-import java.util.LinkedList;
-import java.util.List;
-import org.apache.ignite.cache.affinity.AffinityKeyMapped;
-import org.apache.ignite.cache.store.cassandra.common.PropertyMappingHelper;
-import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
-
-/**
- * Stores persistence settings for Ignite cache key
- */
-public class KeyPersistenceSettings extends PersistenceSettings {
- /** Partition key XML tag. */
- private static final String PARTITION_KEY_ELEMENT = "partitionKey";
-
- /** Cluster key XML tag. */
- private static final String CLUSTER_KEY_ELEMENT = "clusterKey";
-
- /** POJO field XML tag. */
- private static final String FIELD_ELEMENT = "field";
-
- /** POJO fields. */
- private List<PojoField> fields = new LinkedList<>();
-
- /** Partition key fields. */
- private List<PojoField> partKeyFields = new LinkedList<>();
-
- /** Cluster key fields. */
- private List<PojoField> clusterKeyFields = new LinkedList<>();
-
- /**
- * Creates key persistence settings object based on it's XML configuration.
- *
- * @param el XML element storing key persistence settings
- */
- public KeyPersistenceSettings(Element el) {
- super(el);
-
- if (!PersistenceStrategy.POJO.equals(getStrategy()))
- return;
-
- NodeList keyElem = el.getElementsByTagName(PARTITION_KEY_ELEMENT);
-
- Element partKeysNode = keyElem != null ? (Element) keyElem.item(0) : null;
-
- Element clusterKeysNode = el.getElementsByTagName(CLUSTER_KEY_ELEMENT) != null ?
- (Element)el.getElementsByTagName(CLUSTER_KEY_ELEMENT).item(0) : null;
-
- if (partKeysNode == null && clusterKeysNode != null) {
- throw new IllegalArgumentException("It's not allowed to specify cluster key fields mapping, but " +
- "doesn't specify partition key mappings");
- }
-
- partKeyFields = detectFields(partKeysNode, getPartitionKeyDescriptors());
-
- if (partKeyFields == null || partKeyFields.isEmpty()) {
- throw new IllegalStateException("Failed to initialize partition key fields for class '" +
- getJavaClass().getName() + "'");
- }
-
- clusterKeyFields = detectFields(clusterKeysNode, getClusterKeyDescriptors(partKeyFields));
-
- fields = new LinkedList<>();
- fields.addAll(partKeyFields);
- fields.addAll(clusterKeyFields);
-
- checkDuplicates(fields);
- }
-
- /** {@inheritDoc} */
- @Override public List<PojoField> getFields() {
- return fields;
- }
-
- /**
- * Returns Cassandra DDL for primary key.
- *
- * @return DDL statement.
- */
- public String getPrimaryKeyDDL() {
- StringBuilder partKey = new StringBuilder();
-
- List<String> cols = getPartitionKeyColumns();
- for (String column : cols) {
- if (partKey.length() != 0)
- partKey.append(", ");
-
- partKey.append(column);
- }
-
- StringBuilder clusterKey = new StringBuilder();
-
- cols = getClusterKeyColumns();
- if (cols != null) {
- for (String column : cols) {
- if (clusterKey.length() != 0)
- clusterKey.append(", ");
-
- clusterKey.append(column);
- }
- }
-
- return clusterKey.length() == 0 ?
- " primary key ((" + partKey.toString() + "))" :
- " primary key ((" + partKey.toString() + "), " + clusterKey.toString() + ")";
- }
-
- /**
- * Returns Cassandra DDL for cluster key.
- *
- * @return Cluster key DDL.
- */
- public String getClusteringDDL() {
- StringBuilder builder = new StringBuilder();
-
- for (PojoField field : clusterKeyFields) {
- PojoKeyField.SortOrder sortOrder = ((PojoKeyField)field).getSortOrder();
-
- if (sortOrder == null)
- continue;
-
- if (builder.length() != 0)
- builder.append(", ");
-
- boolean asc = PojoKeyField.SortOrder.ASC.equals(sortOrder);
-
- builder.append(field.getColumn()).append(" ").append(asc ? "asc" : "desc");
- }
-
- return builder.length() == 0 ? null : "clustering order by (" + builder.toString() + ")";
- }
-
- /** {@inheritDoc} */
- @Override protected String defaultColumnName() {
- return "key";
- }
-
- /**
- * Returns partition key columns of Cassandra table.
- *
- * @return List of column names.
- */
- private List<String> getPartitionKeyColumns() {
- List<String> cols = new LinkedList<>();
-
- if (PersistenceStrategy.BLOB.equals(getStrategy()) || PersistenceStrategy.PRIMITIVE.equals(getStrategy())) {
- cols.add(getColumn());
- return cols;
- }
-
- if (partKeyFields != null) {
- for (PojoField field : partKeyFields)
- cols.add(field.getColumn());
- }
-
- return cols;
- }
-
- /**
- * Returns cluster key columns of Cassandra table.
- *
- * @return List of column names.
- */
- private List<String> getClusterKeyColumns() {
- List<String> cols = new LinkedList<>();
-
- if (clusterKeyFields != null) {
- for (PojoField field : clusterKeyFields)
- cols.add(field.getColumn());
- }
-
- return cols;
- }
-
- /**
- * Extracts POJO fields specified in XML element.
- *
- * @param el XML element describing fields.
- * @param descriptors POJO fields descriptors.
- * @return List of {@code This} fields.
- */
- private List<PojoField> detectFields(Element el, List<PropertyDescriptor> descriptors) {
- List<PojoField> list = new LinkedList<>();
-
- if (el == null && (descriptors == null || descriptors.isEmpty()))
- return list;
-
- if (el == null) {
- for (PropertyDescriptor descriptor : descriptors)
- list.add(new PojoKeyField(descriptor));
-
- return list;
- }
-
- NodeList nodes = el.getElementsByTagName(FIELD_ELEMENT);
-
- int cnt = nodes == null ? 0 : nodes.getLength();
-
- if (cnt == 0) {
- throw new IllegalArgumentException("Incorrect configuration of Cassandra key persistence settings, " +
- "no cluster key fields specified inside '" + PARTITION_KEY_ELEMENT + "/" +
- CLUSTER_KEY_ELEMENT + "' element");
- }
-
- for (int i = 0; i < cnt; i++) {
- PojoKeyField field = new PojoKeyField((Element)nodes.item(i), getJavaClass());
-
- PropertyDescriptor desc = findPropertyDescriptor(descriptors, field.getName());
-
- if (desc == null) {
- throw new IllegalArgumentException("Specified POJO field '" + field.getName() +
- "' doesn't exist in '" + getJavaClass().getName() + "' class");
- }
-
- list.add(field);
- }
-
- return list;
- }
-
- /**
- * @return POJO field descriptors for partition key.
- */
- private List<PropertyDescriptor> getPartitionKeyDescriptors() {
- List<PropertyDescriptor> primitivePropDescriptors = PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(),
- AffinityKeyMapped.class, true);
-
- return primitivePropDescriptors != null && !primitivePropDescriptors.isEmpty() ?
- primitivePropDescriptors :
- PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
- }
-
- /**
- * @return POJO field descriptors for cluster key.
- */
- private List<PropertyDescriptor> getClusterKeyDescriptors(List<PojoField> partKeyFields) {
- List<PropertyDescriptor> primitivePropDescriptors =
- PropertyMappingHelper.getPojoPropertyDescriptors(getJavaClass(), true);
-
- if (primitivePropDescriptors == null || primitivePropDescriptors.isEmpty() ||
- partKeyFields.size() == primitivePropDescriptors.size())
- return null;
-
- for (PojoField field : partKeyFields) {
- for (int i = 0; i < primitivePropDescriptors.size(); i++) {
- if (primitivePropDescriptors.get(i).getName().equals(field.getName())) {
- primitivePropDescriptors.remove(i);
- break;
- }
- }
- }
-
- return primitivePropDescriptors;
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/231ead01/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
deleted file mode 100644
index 2c43ed4..0000000
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/persistence/KeyValuePersistenceSettings.java
+++ /dev/null
@@ -1,478 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cache.store.cassandra.persistence;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Serializable;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cache.store.cassandra.common.SystemHelper;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.springframework.core.io.Resource;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.xml.sax.InputSource;
-
-/**
- * Stores persistence settings for Ignite cache key and value
- */
-public class KeyValuePersistenceSettings implements Serializable {
- /**
- * Default Cassandra keyspace options which should be used to create new keyspace.
- * <ul>
- * <li> <b>SimpleStrategy</b> for replication work well for single data center Cassandra cluster.<br/>
- * If your Cassandra cluster deployed across multiple data centers it's better to use <b>NetworkTopologyStrategy</b>.
- * </li>
- * <li> Three replicas will be created for each data block. </li>
- * <li> Setting DURABLE_WRITES to true specifies that all data should be written to commit log. </li>
- * </ul>
- */
- private static final String DFLT_KEYSPACE_OPTIONS = "replication = {'class' : 'SimpleStrategy', " +
- "'replication_factor' : 3} and durable_writes = true";
-
- /** Xml attribute specifying Cassandra keyspace to use. */
- private static final String KEYSPACE_ATTR = "keyspace";
-
- /** Xml attribute specifying Cassandra table to use. */
- private static final String TABLE_ATTR = "table";
-
- /** Xml attribute specifying ttl (time to leave) for rows inserted in Cassandra. */
- private static final String TTL_ATTR = "ttl";
-
- /** Root xml element containing persistence settings specification. */
- private static final String PERSISTENCE_NODE = "persistence";
-
- /** Xml element specifying Cassandra keyspace options. */
- private static final String KEYSPACE_OPTIONS_NODE = "keyspaceOptions";
-
- /** Xml element specifying Cassandra table options. */
- private static final String TABLE_OPTIONS_NODE = "tableOptions";
-
- /** Xml element specifying Ignite cache key persistence settings. */
- private static final String KEY_PERSISTENCE_NODE = "keyPersistence";
-
- /** Xml element specifying Ignite cache value persistence settings. */
- private static final String VALUE_PERSISTENCE_NODE = "valuePersistence";
-
- /** TTL (time to leave) for rows inserted into Cassandra table {@link <a href="https://docs.datastax.com/en/cql/3.1/cql/cql_using/use_expire_c.html">Expiring data</a>}. */
- private Integer ttl;
-
- /** Cassandra keyspace (analog of tablespace in relational databases). */
- private String keyspace;
-
- /** Cassandra table. */
- private String tbl;
-
- /** Cassandra table creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_table_r.html">CREATE TABLE</a>}. */
- private String tblOptions;
-
- /** Cassandra keyspace creation options {@link <a href="https://docs.datastax.com/en/cql/3.0/cql/cql_reference/create_keyspace_r.html">CREATE KEYSPACE</a>}. */
- private String keyspaceOptions = DFLT_KEYSPACE_OPTIONS;
-
- /** Persistence settings for Ignite cache keys. */
- private KeyPersistenceSettings keyPersistenceSettings;
-
- /** Persistence settings for Ignite cache values. */
- private ValuePersistenceSettings valPersistenceSettings;
-
- /**
- * Constructs Ignite cache key/value persistence settings.
- *
- * @param settings string containing xml with persistence settings for Ignite cache key/value
- */
- @SuppressWarnings("UnusedDeclaration")
- public KeyValuePersistenceSettings(String settings) {
- init(settings);
- }
-
- /**
- * Constructs Ignite cache key/value persistence settings.
- *
- * @param settingsFile xml file with persistence settings for Ignite cache key/value
- */
- @SuppressWarnings("UnusedDeclaration")
- public KeyValuePersistenceSettings(File settingsFile) {
- InputStream in;
-
- try {
- in = new FileInputStream(settingsFile);
- }
- catch (IOException e) {
- throw new IgniteException("Failed to get input stream for Cassandra persistence settings file: " +
- settingsFile.getAbsolutePath(), e);
- }
-
- init(loadSettings(in));
- }
-
- /**
- * Constructs Ignite cache key/value persistence settings.
- *
- * @param settingsRsrc resource containing xml with persistence settings for Ignite cache key/value
- */
- public KeyValuePersistenceSettings(Resource settingsRsrc) {
- InputStream in;
-
- try {
- in = settingsRsrc.getInputStream();
- }
- catch (IOException e) {
- throw new IgniteException("Failed to get input stream for Cassandra persistence settings resource: " + settingsRsrc, e);
- }
-
- init(loadSettings(in));
- }
-
- /**
- * Returns ttl to use for while inserting new rows into Cassandra table.
- *
- * @return ttl
- */
- public Integer getTTL() {
- return ttl;
- }
-
- /**
- * Returns Cassandra keyspace to use.
- *
- * @return keyspace.
- */
- public String getKeyspace() {
- return keyspace;
- }
-
- /**
- * Returns Cassandra table to use.
- *
- * @return table.
- */
- public String getTable() {
- return tbl;
- }
-
- /**
- * Returns full name of Cassandra table to use (including keyspace).
- *
- * @return full table name in format "keyspace.table".
- */
- public String getTableFullName()
- {
- return keyspace + "." + tbl;
- }
-
- /**
- * Returns persistence settings for Ignite cache keys.
- *
- * @return keys persistence settings.
- */
- public KeyPersistenceSettings getKeyPersistenceSettings() {
- return keyPersistenceSettings;
- }
-
- /**
- * Returns persistence settings for Ignite cache values.
- *
- * @return values persistence settings.
- */
- public ValuePersistenceSettings getValuePersistenceSettings() {
- return valPersistenceSettings;
- }
-
- /**
- * Returns list of POJO fields to be mapped to Cassandra table columns.
- *
- * @return POJO fields list.
- */
- @SuppressWarnings("UnusedDeclaration")
- public List<PojoField> getFields() {
- List<PojoField> fields = new LinkedList<>();
-
- for (PojoField field : keyPersistenceSettings.getFields())
- fields.add(field);
-
- for (PojoField field : valPersistenceSettings.getFields())
- fields.add(field);
-
- return fields;
- }
-
- /**
- * Returns list of Ignite cache key POJO fields to be mapped to Cassandra table columns.
- *
- * @return POJO fields list.
- */
- @SuppressWarnings("UnusedDeclaration")
- public List<PojoField> getKeyFields() {
- return keyPersistenceSettings.getFields();
- }
-
- /**
- * Returns list of Ignite cache value POJO fields to be mapped to Cassandra table columns.
- *
- * @return POJO fields list.
- */
- @SuppressWarnings("UnusedDeclaration")
- public List<PojoField> getValueFields() {
- return valPersistenceSettings.getFields();
- }
-
- /**
- * Returns DDL statement to create Cassandra keyspace.
- *
- * @return Keyspace DDL statement.
- */
- public String getKeyspaceDDLStatement() {
- StringBuilder builder = new StringBuilder();
- builder.append("create keyspace if not exists ").append(keyspace);
-
- if (keyspaceOptions != null) {
- if (!keyspaceOptions.trim().toLowerCase().startsWith("with"))
- builder.append("\nwith");
-
- builder.append(" ").append(keyspaceOptions);
- }
-
- String statement = builder.toString().trim().replaceAll(" +", " ");
-
- return statement.endsWith(";") ? statement : statement + ";";
- }
-
- /**
- * Returns DDL statement to create Cassandra table.
- *
- * @return Table DDL statement.
- */
- public String getTableDDLStatement() {
- String colsDDL = keyPersistenceSettings.getTableColumnsDDL() + ",\n" + valPersistenceSettings.getTableColumnsDDL();
-
- String primaryKeyDDL = keyPersistenceSettings.getPrimaryKeyDDL();
-
- String clusteringDDL = keyPersistenceSettings.getClusteringDDL();
-
- String optionsDDL = tblOptions != null && !tblOptions.trim().isEmpty() ? tblOptions.trim() : "";
-
- if (clusteringDDL != null && !clusteringDDL.isEmpty())
- optionsDDL = optionsDDL.isEmpty() ? clusteringDDL : optionsDDL + " and " + clusteringDDL;
-
- if (!optionsDDL.trim().isEmpty())
- optionsDDL = optionsDDL.trim().toLowerCase().startsWith("with") ? optionsDDL.trim() : "with " + optionsDDL.trim();
-
- StringBuilder builder = new StringBuilder();
-
- builder.append("create table if not exists ").append(keyspace).append(".").append(tbl);
- builder.append("\n(\n").append(colsDDL).append(",\n").append(primaryKeyDDL).append("\n)");
-
- if (!optionsDDL.isEmpty())
- builder.append(" \n").append(optionsDDL);
-
- String tblDDL = builder.toString().trim().replaceAll(" +", " ");
-
- return tblDDL.endsWith(";") ? tblDDL : tblDDL + ";";
- }
-
- /**
- * Returns DDL statements to create Cassandra table secondary indexes.
- *
- * @return DDL statements to create secondary indexes.
- */
- public List<String> getIndexDDLStatements() {
- List<String> idxDDLs = new LinkedList<>();
-
- List<PojoField> fields = valPersistenceSettings.getFields();
-
- for (PojoField field : fields) {
- if (((PojoValueField)field).isIndexed())
- idxDDLs.add(((PojoValueField)field).getIndexDDL(keyspace, tbl));
- }
-
- return idxDDLs;
- }
-
- /**
- * Loads Ignite cache persistence settings from resource.
- *
- * @param in Input stream.
- * @return String containing xml with Ignite cache persistence settings.
- */
- private String loadSettings(InputStream in) {
- StringBuilder settings = new StringBuilder();
- BufferedReader reader = null;
-
- try {
- reader = new BufferedReader(new InputStreamReader(in));
-
- String line = reader.readLine();
-
- while (line != null) {
- if (settings.length() != 0)
- settings.append(SystemHelper.LINE_SEPARATOR);
-
- settings.append(line);
-
- line = reader.readLine();
- }
- }
- catch (Throwable e) {
- throw new IgniteException("Failed to read input stream for Cassandra persistence settings", e);
- }
- finally {
- U.closeQuiet(reader);
- U.closeQuiet(in);
- }
-
- return settings.toString();
- }
-
- /**
- * @param elem Element with data.
- * @param attr Attribute name.
- * @return Numeric value for specified attribute.
- */
- private int extractIntAttribute(Element elem, String attr) {
- String val = elem.getAttribute(attr).trim();
-
- try {
- return Integer.parseInt(val);
- }
- catch (NumberFormatException e) {
- throw new IllegalArgumentException("Incorrect value '" + val + "' specified for '" + attr + "' attribute");
- }
- }
-
- /**
- * Initializes persistence settings from XML string.
- *
- * @param settings XML string containing Ignite cache persistence settings configuration.
- */
- @SuppressWarnings("IfCanBeSwitch")
- private void init(String settings) {
- Document doc;
-
- try {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- DocumentBuilder builder = factory.newDocumentBuilder();
- doc = builder.parse(new InputSource(new StringReader(settings)));
- }
- catch (Throwable e) {
- throw new IllegalArgumentException("Failed to parse persistence settings:" +
- SystemHelper.LINE_SEPARATOR + settings, e);
- }
-
- Element root = doc.getDocumentElement();
-
- if (!PERSISTENCE_NODE.equals(root.getNodeName())) {
- throw new IllegalArgumentException("Incorrect persistence settings specified. " +
- "Root XML element should be 'persistence'");
- }
-
- if (!root.hasAttribute(KEYSPACE_ATTR)) {
- throw new IllegalArgumentException("Incorrect persistence settings '" + KEYSPACE_ATTR +
- "' attribute should be specified");
- }
-
- if (!root.hasAttribute(TABLE_ATTR)) {
- throw new IllegalArgumentException("Incorrect persistence settings '" + TABLE_ATTR +
- "' attribute should be specified");
- }
-
- keyspace = root.getAttribute(KEYSPACE_ATTR).trim();
- tbl = root.getAttribute(TABLE_ATTR).trim();
-
- if (root.hasAttribute(TTL_ATTR))
- ttl = extractIntAttribute(root, TTL_ATTR);
-
- if (!root.hasChildNodes()) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "there are no key and value persistence settings specified");
- }
-
- NodeList children = root.getChildNodes();
- int cnt = children.getLength();
-
- for (int i = 0; i < cnt; i++) {
- Node node = children.item(i);
-
- if (node.getNodeType() != Node.ELEMENT_NODE)
- continue;
-
- Element el = (Element)node;
- String nodeName = el.getNodeName();
-
- if (nodeName.equals(TABLE_OPTIONS_NODE)) {
- tblOptions = el.getTextContent();
- tblOptions = tblOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
- }
- else if (nodeName.equals(KEYSPACE_OPTIONS_NODE)) {
- keyspaceOptions = el.getTextContent();
- keyspaceOptions = keyspaceOptions.replace("\n", " ").replace("\r", "").replace("\t", " ");
- }
- else if (nodeName.equals(KEY_PERSISTENCE_NODE))
- keyPersistenceSettings = new KeyPersistenceSettings(el);
- else if (nodeName.equals(VALUE_PERSISTENCE_NODE))
- valPersistenceSettings = new ValuePersistenceSettings(el);
- }
-
- if (keyPersistenceSettings == null) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "there are no key persistence settings specified");
- }
-
- if (valPersistenceSettings == null) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "there are no value persistence settings specified");
- }
-
- List<PojoField> keyFields = keyPersistenceSettings.getFields();
- List<PojoField> valFields = valPersistenceSettings.getFields();
-
- if (PersistenceStrategy.POJO.equals(keyPersistenceSettings.getStrategy()) &&
- (keyFields == null || keyFields.isEmpty())) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "there are no key fields found");
- }
-
- if (PersistenceStrategy.POJO.equals(valPersistenceSettings.getStrategy()) &&
- (valFields == null || valFields.isEmpty())) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "there are no value fields found");
- }
-
- if (keyFields == null || keyFields.isEmpty() || valFields == null || valFields.isEmpty())
- return;
-
- for (PojoField keyField : keyFields) {
- for (PojoField valField : valFields) {
- if (keyField.getColumn().equals(valField.getColumn())) {
- throw new IllegalArgumentException("Incorrect Cassandra persistence settings specification, " +
- "key column '" + keyField.getColumn() + "' also specified as a value column");
- }
- }
- }
- }
-}