You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/09/15 06:40:05 UTC
[07/40] ignite git commit: IGNITE-3314 Implement Serializable in
org.apache.ignite.cache.store.cassandra.datasource.DataSource. Fixes #974.
IGNITE-3314 Implement Serializable in org.apache.ignite.cache.store.cassandra.datasource.DataSource. Fixes #974.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ba74af4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ba74af4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ba74af4
Branch: refs/heads/ignite-comm-opts1
Commit: 2ba74af4ebfc66631b7f0cd764c7493952da4112
Parents: 799b8ec
Author: Igor Rudyak <ir...@gmail.com>
Authored: Tue Sep 13 18:07:10 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Sep 13 18:07:10 2016 +0700
----------------------------------------------------------------------
.../store/cassandra/datasource/Credentials.java | 4 +-
.../store/cassandra/datasource/DataSource.java | 119 ++++++++++++--
.../cassandra/datasource/PlainCredentials.java | 3 +
.../tests/DatasourceSerializationTest.java | 158 +++++++++++++++++++
.../tests/utils/CassandraAdminCredentials.java | 6 +-
.../utils/CassandraRegularCredentials.java | 6 +-
6 files changed, 280 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
index e1fd60c..a2358a6 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
@@ -17,10 +17,12 @@
package org.apache.ignite.cache.store.cassandra.datasource;
+import java.io.Serializable;
+
/**
* Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML).
*/
-public interface Credentials {
+public interface Credentials extends Serializable {
/**
* Returns user name
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index 1ecb28f..f582aac 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -31,19 +31,37 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy;
import com.datastax.driver.core.policies.ReconnectionPolicy;
import com.datastax.driver.core.policies.RetryPolicy;
import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
+import java.util.UUID;
+
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
/**
* Data source abstraction to specify configuration of the Cassandra session to be used.
*/
-public class DataSource {
+public class DataSource implements Externalizable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Null object, used as a replacement for those Cassandra connection options which
+ * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
+ */
+ private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
+
/** Number of rows to immediately fetch in CQL statement execution. */
private Integer fetchSize;
@@ -324,7 +342,7 @@ public class DataSource {
* @param plc Load balancing policy.
*/
public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
- this.loadBalancingPlc = plc;
+ loadBalancingPlc = plc;
invalidate();
}
@@ -336,7 +354,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setReconnectionPolicy(ReconnectionPolicy plc) {
- this.reconnectionPlc = plc;
+ reconnectionPlc = plc;
invalidate();
}
@@ -348,7 +366,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setRetryPolicy(RetryPolicy plc) {
- this.retryPlc = plc;
+ retryPlc = plc;
invalidate();
}
@@ -360,7 +378,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setAddressTranslator(AddressTranslator translator) {
- this.addrTranslator = translator;
+ addrTranslator = translator;
invalidate();
}
@@ -372,7 +390,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
- this.speculativeExecutionPlc = plc;
+ speculativeExecutionPlc = plc;
invalidate();
}
@@ -384,7 +402,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setAuthProvider(AuthProvider provider) {
- this.authProvider = provider;
+ authProvider = provider;
invalidate();
}
@@ -396,7 +414,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setSslOptions(SSLOptions options) {
- this.sslOptions = options;
+ sslOptions = options;
invalidate();
}
@@ -408,7 +426,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setPoolingOptions(PoolingOptions options) {
- this.poolingOptions = options;
+ poolingOptions = options;
invalidate();
}
@@ -420,7 +438,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setSocketOptions(SocketOptions options) {
- this.sockOptions = options;
+ sockOptions = options;
invalidate();
}
@@ -432,7 +450,7 @@ public class DataSource {
*/
@SuppressWarnings("UnusedDeclaration")
public void setNettyOptions(NettyOptions options) {
- this.nettyOptions = options;
+ nettyOptions = options;
invalidate();
}
@@ -522,6 +540,85 @@ public class DataSource {
return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
}
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(fetchSize);
+ out.writeObject(readConsistency);
+ out.writeObject(writeConsistency);
+ U.writeString(out, user);
+ U.writeString(out, pwd);
+ out.writeObject(port);
+ out.writeObject(contactPoints);
+ out.writeObject(contactPointsWithPorts);
+ out.writeObject(maxSchemaAgreementWaitSeconds);
+ out.writeObject(protoVer);
+ U.writeString(out, compression);
+ out.writeObject(useSSL);
+ out.writeObject(collectMetrix);
+ out.writeObject(jmxReporting);
+ out.writeObject(creds);
+ writeObject(out, loadBalancingPlc);
+ writeObject(out, reconnectionPlc);
+ writeObject(out, addrTranslator);
+ writeObject(out, speculativeExecutionPlc);
+ writeObject(out, authProvider);
+ writeObject(out, sslOptions);
+ writeObject(out, poolingOptions);
+ writeObject(out, sockOptions);
+ writeObject(out, nettyOptions);
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ fetchSize = (Integer)in.readObject();
+ readConsistency = (ConsistencyLevel)in.readObject();
+ writeConsistency = (ConsistencyLevel)in.readObject();
+ user = U.readString(in);
+ pwd = U.readString(in);
+ port = (Integer)in.readObject();
+ contactPoints = (List<InetAddress>)in.readObject();
+ contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
+ maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
+ protoVer = (Integer)in.readObject();
+ compression = U.readString(in);
+ useSSL = (Boolean)in.readObject();
+ collectMetrix = (Boolean)in.readObject();
+ jmxReporting = (Boolean)in.readObject();
+ creds = (Credentials)in.readObject();
+ loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
+ reconnectionPlc = (ReconnectionPolicy)readObject(in);
+ addrTranslator = (AddressTranslator)readObject(in);
+ speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
+ authProvider = (AuthProvider)readObject(in);
+ sslOptions = (SSLOptions)readObject(in);
+ poolingOptions = (PoolingOptions)readObject(in);
+ sockOptions = (SocketOptions)readObject(in);
+ nettyOptions = (NettyOptions)readObject(in);
+ }
+
+ /**
+ * Helper method used to serialize class members
+ * @param out the stream to write the object to
+ * @param obj the object to be written
+ * @throws IOException Includes any I/O exceptions that may occur
+ */
+ private void writeObject(ObjectOutput out, Object obj) throws IOException {
+ out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
+ }
+
+ /**
+ * Helper method used to deserialize class members
+ * @param in the stream to read data from in order to restore the object
+ * @throws IOException Includes any I/O exceptions that may occur
+ * @throws ClassNotFoundException If the class for an object being restored cannot be found
+ * @return deserialized object
+ */
+ private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+ Object obj = in.readObject();
+ return NULL_OBJECT.equals(obj) ? null : obj;
+ }
+
/**
* Parses consistency level provided as string.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
index 9d0710e..46ebdc5 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
@@ -21,6 +21,9 @@ package org.apache.ignite.cache.store.cassandra.datasource;
* Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values.
*/
public class PlainCredentials implements Credentials {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** User name. */
private String user;
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
new file mode 100644
index 0000000..ceb90e0
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
+import org.apache.ignite.tests.utils.CassandraAdminCredentials;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for datasource serialization.
+ */
+public class DatasourceSerializationTest {
+ /**
+ * Sample class for serialization test.
+ */
+ private static class MyLoadBalancingPolicy implements LoadBalancingPolicy, Serializable {
+ /** */
+ private transient LoadBalancingPolicy plc = new TokenAwarePolicy(new RoundRobinPolicy());
+
+ /** {@inheritDoc} */
+ @Override public void init(Cluster cluster, Collection<Host> hosts) {
+ plc.init(cluster, hosts);
+ }
+
+ /** {@inheritDoc} */
+ @Override public HostDistance distance(Host host) {
+ return plc.distance(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+ return plc.newQueryPlan(loggedKeyspace, statement);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAdd(Host host) {
+ plc.onAdd(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUp(Host host) {
+ plc.onUp(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDown(Host host) {
+ plc.onDown(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onRemove(Host host) {
+ plc.onRemove(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ plc.close();
+ }
+ }
+
+ /**
+ * Serialization test.
+ */
+ @Test
+ public void serializationTest() {
+ DataSource src = new DataSource();
+
+ Credentials cred = new CassandraAdminCredentials();
+ String[] points = new String[]{"127.0.0.1", "10.0.0.2", "10.0.0.3"};
+ LoadBalancingPolicy plc = new MyLoadBalancingPolicy();
+
+ src.setCredentials(cred);
+ src.setContactPoints(points);
+ src.setReadConsistency("ONE");
+ src.setWriteConsistency("QUORUM");
+ src.setLoadBalancingPolicy(plc);
+
+ JavaSerializer serializer = new JavaSerializer();
+
+ ByteBuffer buff = serializer.serialize(src);
+ DataSource _src = (DataSource)serializer.deserialize(buff);
+
+ Credentials _cred = (Credentials)getFieldValue(_src, "creds");
+ List<InetAddress> _points = (List<InetAddress>)getFieldValue(_src, "contactPoints");
+ ConsistencyLevel _readCons = (ConsistencyLevel)getFieldValue(_src, "readConsistency");
+ ConsistencyLevel _writeCons = (ConsistencyLevel)getFieldValue(_src, "writeConsistency");
+ LoadBalancingPolicy _plc = (LoadBalancingPolicy)getFieldValue(_src, "loadBalancingPlc");
+
+ assertTrue("Incorrectly serialized/deserialized credentials for Cassandra DataSource",
+ cred.getPassword().equals(_cred.getPassword()) && cred.getUser().equals(_cred.getUser()));
+
+ assertTrue("Incorrectly serialized/deserialized contact points for Cassandra DataSource",
+ "/127.0.0.1".equals(_points.get(0).toString()) &&
+ "/10.0.0.2".equals(_points.get(1).toString()) &&
+ "/10.0.0.3".equals(_points.get(2).toString()));
+
+ assertTrue("Incorrectly serialized/deserialized consistency levels for Cassandra DataSource",
+ ConsistencyLevel.ONE == _readCons && ConsistencyLevel.QUORUM == _writeCons);
+
+ assertTrue("Incorrectly serialized/deserialized load balancing policy for Cassandra DataSource",
+ _plc instanceof MyLoadBalancingPolicy);
+ }
+
+ /**
+ * @param obj Object.
+ * @param field Field name.
+ * @return Field value.
+ */
+ private Object getFieldValue(Object obj, String field) {
+ try {
+ Field f = obj.getClass().getDeclaredField(field);
+
+ f.setAccessible(true);
+
+ return f.get(obj);
+ }
+ catch (Throwable e) {
+ throw new RuntimeException("Failed to get field '" + field + "' value", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
index 66df6e7..e7047f3 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
@@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils;
import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
/**
- * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials}
- * providing admin user/password to establish Cassandra session.
+ * Implementation of {@link Credentials} providing admin user/password to establish Cassandra session.
*/
public class CassandraAdminCredentials implements Credentials {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public String getUser() {
return CassandraHelper.getAdminUser();
http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
index 52937ea..7546c9b 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
@@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils;
import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
/**
- * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials}
- * providing regular user/password to establish Cassandra session.
+ * Implementation of {@link Credentials} providing regular user/password to establish Cassandra session.
*/
public class CassandraRegularCredentials implements Credentials {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
@Override public String getUser() {
return CassandraHelper.getRegularUser();