You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2016/09/16 05:49:30 UTC

[16/50] [abbrv] ignite git commit: IGNITE-3314 Implement Serializable in org.apache.ignite.cache.store.cassandra.datasource.DataSource. Fixes #974.

IGNITE-3314 Implement Serializable in org.apache.ignite.cache.store.cassandra.datasource.DataSource. Fixes #974.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2ba74af4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2ba74af4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2ba74af4

Branch: refs/heads/ignite-3443
Commit: 2ba74af4ebfc66631b7f0cd764c7493952da4112
Parents: 799b8ec
Author: Igor Rudyak <ir...@gmail.com>
Authored: Tue Sep 13 18:07:10 2016 +0700
Committer: Alexey Kuznetsov <ak...@apache.org>
Committed: Tue Sep 13 18:07:10 2016 +0700

----------------------------------------------------------------------
 .../store/cassandra/datasource/Credentials.java |   4 +-
 .../store/cassandra/datasource/DataSource.java  | 119 ++++++++++++--
 .../cassandra/datasource/PlainCredentials.java  |   3 +
 .../tests/DatasourceSerializationTest.java      | 158 +++++++++++++++++++
 .../tests/utils/CassandraAdminCredentials.java  |   6 +-
 .../utils/CassandraRegularCredentials.java      |   6 +-
 6 files changed, 280 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
index e1fd60c..a2358a6 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/Credentials.java
@@ -17,10 +17,12 @@
 
 package org.apache.ignite.cache.store.cassandra.datasource;
 
+import java.io.Serializable;
+
 /**
  * Provides credentials for Cassandra (instead of specifying user/password directly in Spring context XML).
  */
-public interface Credentials {
+public interface Credentials extends Serializable {
     /**
      * Returns user name
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
index 1ecb28f..f582aac 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/DataSource.java
@@ -31,19 +31,37 @@ import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import com.datastax.driver.core.policies.ReconnectionPolicy;
 import com.datastax.driver.core.policies.RetryPolicy;
 import com.datastax.driver.core.policies.SpeculativeExecutionPolicy;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
+
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.store.cassandra.session.CassandraSession;
 import org.apache.ignite.cache.store.cassandra.session.CassandraSessionImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Data source abstraction to specify configuration of the Cassandra session to be used.
  */
-public class DataSource {
+public class DataSource implements Externalizable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Null object, used as a replacement for those Cassandra connection options which
+     * don't support serialization (RetryPolicy, LoadBalancingPolicy and etc).
+     */
+    private static final UUID NULL_OBJECT = UUID.fromString("45ffae47-3193-5910-84a2-048fe65735d9");
+
     /** Number of rows to immediately fetch in CQL statement execution. */
     private Integer fetchSize;
 
@@ -324,7 +342,7 @@ public class DataSource {
      * @param plc Load balancing policy.
      */
     public void setLoadBalancingPolicy(LoadBalancingPolicy plc) {
-        this.loadBalancingPlc = plc;
+        loadBalancingPlc = plc;
 
         invalidate();
     }
@@ -336,7 +354,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setReconnectionPolicy(ReconnectionPolicy plc) {
-        this.reconnectionPlc = plc;
+        reconnectionPlc = plc;
 
         invalidate();
     }
@@ -348,7 +366,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setRetryPolicy(RetryPolicy plc) {
-        this.retryPlc = plc;
+        retryPlc = plc;
 
         invalidate();
     }
@@ -360,7 +378,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setAddressTranslator(AddressTranslator translator) {
-        this.addrTranslator = translator;
+        addrTranslator = translator;
 
         invalidate();
     }
@@ -372,7 +390,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setSpeculativeExecutionPolicy(SpeculativeExecutionPolicy plc) {
-        this.speculativeExecutionPlc = plc;
+        speculativeExecutionPlc = plc;
 
         invalidate();
     }
@@ -384,7 +402,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setAuthProvider(AuthProvider provider) {
-        this.authProvider = provider;
+        authProvider = provider;
 
         invalidate();
     }
@@ -396,7 +414,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setSslOptions(SSLOptions options) {
-        this.sslOptions = options;
+        sslOptions = options;
 
         invalidate();
     }
@@ -408,7 +426,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setPoolingOptions(PoolingOptions options) {
-        this.poolingOptions = options;
+        poolingOptions = options;
 
         invalidate();
     }
@@ -420,7 +438,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setSocketOptions(SocketOptions options) {
-        this.sockOptions = options;
+        sockOptions = options;
 
         invalidate();
     }
@@ -432,7 +450,7 @@ public class DataSource {
      */
     @SuppressWarnings("UnusedDeclaration")
     public void setNettyOptions(NettyOptions options) {
-        this.nettyOptions = options;
+        nettyOptions = options;
 
         invalidate();
     }
@@ -522,6 +540,85 @@ public class DataSource {
         return ses = new CassandraSessionImpl(builder, fetchSize, readConsistency, writeConsistency, log);
     }
 
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(fetchSize);
+        out.writeObject(readConsistency);
+        out.writeObject(writeConsistency);
+        U.writeString(out, user);
+        U.writeString(out, pwd);
+        out.writeObject(port);
+        out.writeObject(contactPoints);
+        out.writeObject(contactPointsWithPorts);
+        out.writeObject(maxSchemaAgreementWaitSeconds);
+        out.writeObject(protoVer);
+        U.writeString(out, compression);
+        out.writeObject(useSSL);
+        out.writeObject(collectMetrix);
+        out.writeObject(jmxReporting);
+        out.writeObject(creds);
+        writeObject(out, loadBalancingPlc);
+        writeObject(out, reconnectionPlc);
+        writeObject(out, addrTranslator);
+        writeObject(out, speculativeExecutionPlc);
+        writeObject(out, authProvider);
+        writeObject(out, sslOptions);
+        writeObject(out, poolingOptions);
+        writeObject(out, sockOptions);
+        writeObject(out, nettyOptions);
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        fetchSize = (Integer)in.readObject();
+        readConsistency = (ConsistencyLevel)in.readObject();
+        writeConsistency = (ConsistencyLevel)in.readObject();
+        user = U.readString(in);
+        pwd = U.readString(in);
+        port = (Integer)in.readObject();
+        contactPoints = (List<InetAddress>)in.readObject();
+        contactPointsWithPorts = (List<InetSocketAddress>)in.readObject();
+        maxSchemaAgreementWaitSeconds = (Integer)in.readObject();
+        protoVer = (Integer)in.readObject();
+        compression = U.readString(in);
+        useSSL = (Boolean)in.readObject();
+        collectMetrix = (Boolean)in.readObject();
+        jmxReporting = (Boolean)in.readObject();
+        creds = (Credentials)in.readObject();
+        loadBalancingPlc = (LoadBalancingPolicy)readObject(in);
+        reconnectionPlc = (ReconnectionPolicy)readObject(in);
+        addrTranslator = (AddressTranslator)readObject(in);
+        speculativeExecutionPlc = (SpeculativeExecutionPolicy)readObject(in);
+        authProvider = (AuthProvider)readObject(in);
+        sslOptions = (SSLOptions)readObject(in);
+        poolingOptions = (PoolingOptions)readObject(in);
+        sockOptions = (SocketOptions)readObject(in);
+        nettyOptions = (NettyOptions)readObject(in);
+    }
+
+    /**
+     * Helper method used to serialize class members
+     * @param out the stream to write the object to
+     * @param obj the object to be written
+     * @throws IOException Includes any I/O exceptions that may occur
+     */
+    private void writeObject(ObjectOutput out, Object obj) throws IOException {
+        out.writeObject(obj == null || !(obj instanceof Serializable) ? NULL_OBJECT : obj);
+    }
+
+    /**
+     * Helper method used to deserialize class members
+     * @param in the stream to read data from in order to restore the object
+     * @throws IOException Includes any I/O exceptions that may occur
+     * @throws ClassNotFoundException If the class for an object being restored cannot be found
+     * @return deserialized object
+     */
+    private Object readObject(ObjectInput in) throws IOException, ClassNotFoundException {
+        Object obj = in.readObject();
+        return NULL_OBJECT.equals(obj) ? null : obj;
+    }
+
     /**
      * Parses consistency level provided as string.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
index 9d0710e..46ebdc5 100644
--- a/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
+++ b/modules/cassandra/src/main/java/org/apache/ignite/cache/store/cassandra/datasource/PlainCredentials.java
@@ -21,6 +21,9 @@ package org.apache.ignite.cache.store.cassandra.datasource;
  * Simple implementation of {@link Credentials} which just uses its constructor to hold user/password values.
  */
 public class PlainCredentials implements Credentials {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** User name. */
     private String user;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
new file mode 100644
index 0000000..ceb90e0
--- /dev/null
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/DatasourceSerializationTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
+import org.apache.ignite.cache.store.cassandra.datasource.DataSource;
+import org.apache.ignite.cache.store.cassandra.serializer.JavaSerializer;
+import org.apache.ignite.tests.utils.CassandraAdminCredentials;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for datasource serialization.
+ */
+public class DatasourceSerializationTest {
+    /**
+     * Sample class for serialization test.
+     */
+    private static class MyLoadBalancingPolicy implements LoadBalancingPolicy, Serializable {
+        /** */
+        private transient LoadBalancingPolicy plc = new TokenAwarePolicy(new RoundRobinPolicy());
+
+        /** {@inheritDoc} */
+        @Override public void init(Cluster cluster, Collection<Host> hosts) {
+            plc.init(cluster, hosts);
+        }
+
+        /** {@inheritDoc} */
+        @Override public HostDistance distance(Host host) {
+            return plc.distance(host);
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement) {
+            return plc.newQueryPlan(loggedKeyspace, statement);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onAdd(Host host) {
+            plc.onAdd(host);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onUp(Host host) {
+            plc.onUp(host);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onDown(Host host) {
+            plc.onDown(host);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onRemove(Host host) {
+            plc.onRemove(host);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            plc.close();
+        }
+    }
+
+    /**
+     * Serialization test.
+     */
+    @Test
+    public void serializationTest() {
+        DataSource src = new DataSource();
+
+        Credentials cred = new CassandraAdminCredentials();
+        String[] points = new String[]{"127.0.0.1", "10.0.0.2", "10.0.0.3"};
+        LoadBalancingPolicy plc = new MyLoadBalancingPolicy();
+
+        src.setCredentials(cred);
+        src.setContactPoints(points);
+        src.setReadConsistency("ONE");
+        src.setWriteConsistency("QUORUM");
+        src.setLoadBalancingPolicy(plc);
+
+        JavaSerializer serializer = new JavaSerializer();
+
+        ByteBuffer buff = serializer.serialize(src);
+        DataSource _src = (DataSource)serializer.deserialize(buff);
+
+        Credentials _cred = (Credentials)getFieldValue(_src, "creds");
+        List<InetAddress> _points = (List<InetAddress>)getFieldValue(_src, "contactPoints");
+        ConsistencyLevel _readCons = (ConsistencyLevel)getFieldValue(_src, "readConsistency");
+        ConsistencyLevel _writeCons = (ConsistencyLevel)getFieldValue(_src, "writeConsistency");
+        LoadBalancingPolicy _plc = (LoadBalancingPolicy)getFieldValue(_src, "loadBalancingPlc");
+
+        assertTrue("Incorrectly serialized/deserialized credentials for Cassandra DataSource",
+            cred.getPassword().equals(_cred.getPassword()) && cred.getUser().equals(_cred.getUser()));
+
+        assertTrue("Incorrectly serialized/deserialized contact points for Cassandra DataSource",
+            "/127.0.0.1".equals(_points.get(0).toString()) &&
+            "/10.0.0.2".equals(_points.get(1).toString()) &&
+            "/10.0.0.3".equals(_points.get(2).toString()));
+
+        assertTrue("Incorrectly serialized/deserialized consistency levels for Cassandra DataSource",
+            ConsistencyLevel.ONE == _readCons && ConsistencyLevel.QUORUM == _writeCons);
+
+        assertTrue("Incorrectly serialized/deserialized load balancing policy for Cassandra DataSource",
+            _plc instanceof MyLoadBalancingPolicy);
+    }
+
+    /**
+     * @param obj Object.
+     * @param field Field name.
+     * @return Field value.
+     */
+    private Object getFieldValue(Object obj, String field) {
+        try {
+            Field f = obj.getClass().getDeclaredField(field);
+
+            f.setAccessible(true);
+
+            return f.get(obj);
+        }
+        catch (Throwable e) {
+            throw new RuntimeException("Failed to get field '" + field + "' value", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
index 66df6e7..e7047f3 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraAdminCredentials.java
@@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils;
 import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
 
 /**
- * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials}
- * providing admin user/password to establish Cassandra session.
+ * Implementation of {@link Credentials} providing admin user/password to establish Cassandra session.
  */
 public class CassandraAdminCredentials implements Credentials {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public String getUser() {
         return CassandraHelper.getAdminUser();

http://git-wip-us.apache.org/repos/asf/ignite/blob/2ba74af4/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
----------------------------------------------------------------------
diff --git a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
index 52937ea..7546c9b 100644
--- a/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
+++ b/modules/cassandra/src/test/java/org/apache/ignite/tests/utils/CassandraRegularCredentials.java
@@ -20,10 +20,12 @@ package org.apache.ignite.tests.utils;
 import org.apache.ignite.cache.store.cassandra.datasource.Credentials;
 
 /**
- * Implementation of {@link org.apache.ignite.cache.store.cassandra.datasource.Credentials}
- * providing regular user/password to establish Cassandra session.
+ * Implementation of {@link Credentials} providing regular user/password to establish Cassandra session.
  */
 public class CassandraRegularCredentials implements Credentials {
+    /** */
+    private static final long serialVersionUID = 0L;
+
     /** {@inheritDoc} */
     @Override public String getUser() {
         return CassandraHelper.getRegularUser();