You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2015/01/13 17:27:29 UTC
[1/3] cassandra git commit: Fix loading set types in pig with the 2.1
client driver.
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.1 0757dc72f -> 1cb426b98
refs/heads/trunk f1475244f -> 42e483a4e
Fix loading set types in pig with the 2.1 client driver.
Patch by Artem Aliev, reviewed by brandonwilliams for CASSANDRA-8577
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1cb426b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1cb426b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1cb426b9
Branch: refs/heads/cassandra-2.1
Commit: 1cb426b9831b42b5f368eac51a6e3bebdb1bd62a
Parents: 0757dc7
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 10:23:28 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 10:23:28 2015 -0600
----------------------------------------------------------------------
.../apache/cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +++++++++++
.../apache/cassandra/hadoop/cql3/CqlRecordReader.java | 12 ++++++++++++
.../cassandra/hadoop/pig/AbstractCassandraStorage.java | 5 ++++-
.../apache/cassandra/hadoop/pig/CqlNativeStorage.java | 4 ++++
4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 2be811f..7d65663 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -81,6 +81,8 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
+ private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
+
private static final String OUTPUT_CQL = "cassandra.output.cql";
/**
@@ -279,6 +281,10 @@ public class CqlConfigHelper
return conf.get(OUTPUT_CQL);
}
+ private static Optional<Integer> getProtocolVersion(Configuration conf) {
+ return getIntSetting(INPUT_NATIVE_PROTOCOL_VERSION, conf);
+ }
+
public static Cluster getInputCluster(String host, Configuration conf)
{
// this method has been left for backward compatibility
@@ -290,6 +296,7 @@ public class CqlConfigHelper
int port = getInputNativePort(conf);
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
+ Optional<Integer> protocolVersion = getProtocolVersion(conf);
LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
@@ -305,6 +312,9 @@ public class CqlConfigHelper
if (sslOptions.isPresent())
builder.withSSL(sslOptions.get());
+ if (protocolVersion.isPresent()) {
+ builder.withProtocolVersion(protocolVersion.get());
+ }
builder.withLoadBalancingPolicy(loadBalancingPolicy)
.withSocketOptions(socketOptions)
.withQueryOptions(queryOptions)
@@ -313,6 +323,7 @@ public class CqlConfigHelper
return builder.build();
}
+
public static void setInputCoreConnections(Configuration conf, String connections)
{
conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9c1118b..6a1f5bf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,6 +89,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
+ protected int nativeProtocolVersion = 1;
public CqlRecordReader()
{
@@ -129,6 +130,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (session == null)
throw new RuntimeException("Can't create connection session");
+ //get negotiated serialization protocol
+ nativeProtocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
+
// If the user provides a CQL query then we will use it without validation
// otherwise we will fall back to building a query using the:
// inputColumns
@@ -230,6 +234,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
return new WrappedRow();
}
+ /**
+ * Return native version protocol of the cluster connection
+ * @return serialization protocol version.
+ */
+ public int getNativeProtocolVersion() {
+ return nativeProtocolVersion;
+ }
+
/** CQL row iterator
* Input cql query
* 1) select clause must include key columns (if we use partition key based row count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 361baa4..035f99a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
+
protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
// system environment variables that can be set to configure connection info:
@@ -101,6 +102,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected boolean usePartitionFilter = false;
protected String initHostAddress;
protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
+
public AbstractCassandraStorage()
{
@@ -793,7 +796,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
{
// For CollectionType, the compose() method assumes the v3 protocol format of collection, which
// is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
- return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, 1);
+ return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
}
return validator.compose(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 3c59a1c..f0bb8f9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -78,6 +79,9 @@ public class CqlNativeStorage extends CqlStorage
public void prepareToRead(RecordReader reader, PigSplit split)
{
this.reader = reader;
+ if (reader instanceof CqlRecordReader) {
+ nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
+ }
}
/** get next row */
[2/3] cassandra git commit: Fix loading set types in pig with the 2.1
client driver.
Posted by br...@apache.org.
Fix loading set types in pig with the 2.1 client driver.
Patch by Artem Aliev, reviewed by brandonwilliams for CASSANDRA-8577
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1cb426b9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1cb426b9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1cb426b9
Branch: refs/heads/trunk
Commit: 1cb426b9831b42b5f368eac51a6e3bebdb1bd62a
Parents: 0757dc7
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 10:23:28 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 10:23:28 2015 -0600
----------------------------------------------------------------------
.../apache/cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +++++++++++
.../apache/cassandra/hadoop/cql3/CqlRecordReader.java | 12 ++++++++++++
.../cassandra/hadoop/pig/AbstractCassandraStorage.java | 5 ++++-
.../apache/cassandra/hadoop/pig/CqlNativeStorage.java | 4 ++++
4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 2be811f..7d65663 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -81,6 +81,8 @@ public class CqlConfigHelper
private static final String INPUT_NATIVE_SSL_KEY_STORE_PASSWARD = "cassandra.input.native.ssl.key.store.password";
private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
+ private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
+
private static final String OUTPUT_CQL = "cassandra.output.cql";
/**
@@ -279,6 +281,10 @@ public class CqlConfigHelper
return conf.get(OUTPUT_CQL);
}
+ private static Optional<Integer> getProtocolVersion(Configuration conf) {
+ return getIntSetting(INPUT_NATIVE_PROTOCOL_VERSION, conf);
+ }
+
public static Cluster getInputCluster(String host, Configuration conf)
{
// this method has been left for backward compatibility
@@ -290,6 +296,7 @@ public class CqlConfigHelper
int port = getInputNativePort(conf);
Optional<AuthProvider> authProvider = getAuthProvider(conf);
Optional<SSLOptions> sslOptions = getSSLOptions(conf);
+ Optional<Integer> protocolVersion = getProtocolVersion(conf);
LoadBalancingPolicy loadBalancingPolicy = getReadLoadBalancingPolicy(conf, hosts);
SocketOptions socketOptions = getReadSocketOptions(conf);
QueryOptions queryOptions = getReadQueryOptions(conf);
@@ -305,6 +312,9 @@ public class CqlConfigHelper
if (sslOptions.isPresent())
builder.withSSL(sslOptions.get());
+ if (protocolVersion.isPresent()) {
+ builder.withProtocolVersion(protocolVersion.get());
+ }
builder.withLoadBalancingPolicy(loadBalancingPolicy)
.withSocketOptions(socketOptions)
.withQueryOptions(queryOptions)
@@ -313,6 +323,7 @@ public class CqlConfigHelper
return builder.build();
}
+
public static void setInputCoreConnections(Configuration conf, String connections)
{
conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 9c1118b..6a1f5bf 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -89,6 +89,7 @@ public class CqlRecordReader extends RecordReader<Long, Row>
// partition keys -- key aliases
private LinkedHashMap<String, Boolean> partitionBoundColumns = Maps.newLinkedHashMap();
+ protected int nativeProtocolVersion = 1;
public CqlRecordReader()
{
@@ -129,6 +130,9 @@ public class CqlRecordReader extends RecordReader<Long, Row>
if (session == null)
throw new RuntimeException("Can't create connection session");
+ //get negotiated serialization protocol
+ nativeProtocolVersion = cluster.getConfiguration().getProtocolOptions().getProtocolVersion();
+
// If the user provides a CQL query then we will use it without validation
// otherwise we will fall back to building a query using the:
// inputColumns
@@ -230,6 +234,14 @@ public class CqlRecordReader extends RecordReader<Long, Row>
return new WrappedRow();
}
+ /**
+ * Return native version protocol of the cluster connection
+ * @return serialization protocol version.
+ */
+ public int getNativeProtocolVersion() {
+ return nativeProtocolVersion;
+ }
+
/** CQL row iterator
* Input cql query
* 1) select clause must include key columns (if we use partition key based row count)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
index 361baa4..035f99a 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractCassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
+
protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
// system environment variables that can be set to configure connection info:
@@ -101,6 +102,8 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
protected boolean usePartitionFilter = false;
protected String initHostAddress;
protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
+
public AbstractCassandraStorage()
{
@@ -793,7 +796,7 @@ public abstract class AbstractCassandraStorage extends LoadFunc implements Store
{
// For CollectionType, the compose() method assumes the v3 protocol format of collection, which
// is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
- return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, 1);
+ return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
}
return validator.compose(value);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1cb426b9/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 3c59a1c..f0bb8f9 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
import org.apache.cassandra.thrift.CfDef;
import org.apache.cassandra.thrift.ColumnDef;
import org.apache.cassandra.utils.ByteBufferUtil;
@@ -78,6 +79,9 @@ public class CqlNativeStorage extends CqlStorage
public void prepareToRead(RecordReader reader, PigSplit split)
{
this.reader = reader;
+ if (reader instanceof CqlRecordReader) {
+ nativeProtocolVersion = ((CqlRecordReader) reader).getNativeProtocolVersion();
+ }
}
/** get next row */
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by br...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/42e483a4
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/42e483a4
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/42e483a4
Branch: refs/heads/trunk
Commit: 42e483a4e1dc69e886e4c0d1117d7081abaa3768
Parents: f147524 1cb426b
Author: Brandon Williams <br...@apache.org>
Authored: Tue Jan 13 10:27:18 2015 -0600
Committer: Brandon Williams <br...@apache.org>
Committed: Tue Jan 13 10:27:18 2015 -0600
----------------------------------------------------------------------
.../apache/cassandra/hadoop/cql3/CqlConfigHelper.java | 11 +++++++++++
.../apache/cassandra/hadoop/cql3/CqlRecordReader.java | 12 ++++++++++++
.../cassandra/hadoop/pig/AbstractCassandraStorage.java | 5 ++++-
.../apache/cassandra/hadoop/pig/CqlNativeStorage.java | 4 ++++
4 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/42e483a4/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/42e483a4/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/42e483a4/src/java/org/apache/cassandra/hadoop/pig/AbstractCassandraStorage.java
----------------------------------------------------------------------