You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/01/13 17:27:30 UTC

[2/3] cassandra git commit: Fix loading set types in pig with the 2.1 client driver.

Fix loading set types in pig with the 2.1 client driver.

Patch by Artem Aliev, reviewed by brandonwilliams for CASSANDRA-8577


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1cb426b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1cb426b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1cb426b9

Branch: refs/heads/trunk
Commit: 1cb426b9831b42b5f368eac51a6e3bebdb1bd62a
Parents: 0757dc7
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 10:23:28 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 10:23:28 2015 -0600

----------------------------------------------------------------------
 .../apache/cassandra/hadoop/cql3/CqlConfigHelper.java   | 11 +++++++++++
 .../apache/cassandra/hadoop/cql3/CqlRecordReader.java   | 12 ++++++++++++
 .../cassandra/hadoop/pig/AbstractCassandraStorage.java  |  5 ++++-
 .../apache/cassandra/hadoop/pig/CqlNativeStorage.java   |  4 ++++
 4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 2be811f..7d65663 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -81,6 +81,8 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
     private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
 
+    private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
+
     private static final String OUTPUT_CQL = "cassandra.output.cql";
     
     /**
@@ -279,6 +281,10 @@ public class CqlConfigHelper
         return conf.get(OUTPUT_CQL);
     }
 
+    private static Optional<Integer> getProtocolVersion(Configuration conf) {
+        return getIntSetting(INPUT_NATIVE_PROTOCOL_VERSION, conf);
+    }
+
     public static Cluster getInputCluster(String host, Configuration conf)
     {
         // this method has been left for backward compatibility
@@ -290,6 +296,7 @@ public class CqlConfigHelper
         int port = getInputNativePort(conf);
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
+        Optional<Integer> protocolVersion = getProtocolVersion(conf);
         LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
@@ -305,6 +312,9 @@ public class CqlConfigHelper
         if (sslOptions.isPresent())
             builder.withSSL(sslOptions.get());
 
+        if (protocolVersion.isPresent()) {
+            builder.withProtocolVersion(protocolVersion.get());
+        }
         builder.withLoadBalancingPolicy(loadBalancingPolicy)
                .withSocketOptions(socketOptions)
                .withQueryOptions(queryOptions)
@@ -313,6 +323,7 @@ public class CqlConfigHelper
         return builder.build();
     }
 
+
     public static void setInputCoreConnections(Configuration conf, String connections)
     {
         conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9c1118b..6a1f5bf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,6 +89,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     // partition keys -- key aliases
     private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
+    protected int nativeProtocolVersion = 1;
 
     public CqlRecordReader()
     {
@@ -129,6 +130,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         if (session == null)
           throw new RuntimeException("Can't create connection session");
 
+        //get negotiated serialization protocol
+        nativeProtocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
+
         // If the user provides a CQL query then we will use it without validation
         // otherwise we will fall back to building a query using the:
         //   inputColumns
@@ -230,6 +234,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         return new WrappedRow();
     }
 
+    /**
+     * Return native version protocol of the cluster connection
+     * @return serialization protocol version.
+     */
+    public int getNativeProtocolVersion() {
+        return nativeProtocolVersion;
+    }
+
     /** CQL row iterator 
      *  Input cql query  
      *  1) select clause must include key columns (if we use partition key based row count)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 361baa4..035f99a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
 {
+
     protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
 
     // system environment variables that can be set to configure connection info:
@@ -101,6 +102,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
     protected boolean usePartitionFilter = false;
     protected String initHostAddress;
     protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
+
 
     public AbstractCassandraStorage()
     {
@@ -793,7 +796,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
         {
             // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
             // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
-            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, 1);
+            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
         }
 
         return validator.compose(value);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 3c59a1c..f0bb8f9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 import org.apache.cassandra.thrift.CfDef;
 import org.apache.cassandra.thrift.ColumnDef;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -78,6 +79,9 @@ public class CqlNativeStorage extends CqlStorage
     public void prepareToRead(RecordReader reader, PigSplit split)
     {
         this.reader = reader;
+        if (reader instanceof CqlRecordReader) {
+            nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
+        }
     }
 
     /** get next row */