You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/15 17:12:56 UTC
[1/2] cassandra git commit: (Pig) support BulkOutputFormat as a URL
parameter
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 979179609 -> cd4a1e6ac
(Pig) support BulkOutputFormat as a URL parameter
patch by Alex Liu; reviewed by Piotr Kołaczkowski for CASSANDRA-7410
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7b40735
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7b40735
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7b40735
Branch: refs/heads/cassandra-2.2
Commit: c7b40735789c840529002eb3c11d8731f460d61c
Parents: ae51086
Author: Alex Liu <al...@yahoo.com>
Authored: Tue Sep 15 16:06:18 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Sep 15 16:08:54 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/cql3/CqlBulkOutputFormat.java | 93 +++++++-
.../hadoop/cql3/CqlBulkRecordWriter.java | 87 ++++----
.../cassandra/hadoop/pig/CqlNativeStorage.java | 213 +++++++++++++------
.../apache/cassandra/hadoop/pig/CqlStorage.java | 1 -
.../org/apache/cassandra/tools/BulkLoader.java | 2 +-
test/conf/cassandra.yaml | 1 +
.../org/apache/cassandra/pig/CqlTableTest.java | 36 ++++
.../org/apache/cassandra/pig/PigTestBase.java | 3 +-
9 files changed, 336 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dff47fc..5f11049 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.10
+ * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
* BATCH statement is broken in cqlsh (CASSANDRA-10272)
* Added configurable warning threshold for GC duration (CASSANDRA-8907)
* (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 887fe8e..7fedb41 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
+import org.apache.cassandra.config.EncryptionOptions;
import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.hadoop.conf.Configuration;
@@ -54,6 +55,16 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
private static final String DELETE_SOURCE = "cassandra.output.delete.source";
+ private static final String OUTPUT_CQL_STORAGE_PORT = "cassandra.storage.port";
+ private static final String OUTPUT_CQL_SSL_STORAGE_PORT = "cassandra.ssl.storage.port";
+ private static final String INTERNODE_ENCRYPTION = "cassandra.internode.encryption";
+ private static final String SERVER_KEYSTORE = "cassandra.server.keystore";
+ private static final String SERVER_KEYSTORE_PASSWORD = "cassandra.server.keystore.password";
+ private static final String SERVER_TRUSTSTORE = "cassandra.server.truststore";
+ private static final String SERVER_TRUSTSTORE_PASSWORD = "cassandra.server.truststore.password";
+ private static final String SERVER_CIPHER_SUITES = "cassandra.server.truststore.password";
+ public static final int DEFAULT_STORAGE_PORT = 7000;
+ public static final int DEFAULT_SSL_STORAGE_PORT = 7001;
/** Fills the deprecated OutputFormat interface for streaming. */
@Deprecated
@@ -84,7 +95,87 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
{
conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
}
-
+
+ public static void setStoragePort(Configuration conf, int port)
+ {
+ conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port);
+ }
+
+ public static void setSSLStoragePort(Configuration conf, int port)
+ {
+ conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port);
+ }
+
+ public static void setInternodeEncryption(Configuration conf, String encrypt)
+ {
+ conf.set(INTERNODE_ENCRYPTION, encrypt);
+ }
+
+ public static void setServerKeystore(Configuration conf, String keystore)
+ {
+ conf.set(SERVER_KEYSTORE, keystore);
+ }
+
+ public static void setServerKeystorePassword(Configuration conf, String keystorePass)
+ {
+ conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass);
+ }
+
+ public static void setServerTruststore(Configuration conf, String truststore)
+ {
+ conf.set(SERVER_TRUSTSTORE, truststore);
+ }
+
+ public static void setServerTruststorePassword(Configuration conf, String truststorePass)
+ {
+ conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass);
+ }
+
+ public static void setServerCipherSuites(Configuration conf, String cipherSuites)
+ {
+ conf.set(SERVER_CIPHER_SUITES, cipherSuites);
+ }
+
+ public static int getStoragePort(Configuration conf)
+ {
+ return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT);
+ }
+
+ public static int getSSLStoragePort(Configuration conf)
+ {
+ return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, DEFAULT_SSL_STORAGE_PORT);
+ }
+
+ public static String getInternodeEncryption(Configuration conf)
+ {
+ return conf.get(INTERNODE_ENCRYPTION, EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name());
+ }
+
+ public static String getServerKeystore(Configuration conf)
+ {
+ return conf.get(SERVER_KEYSTORE);
+ }
+
+ public static String getServerTruststore(Configuration conf)
+ {
+ return conf.get(SERVER_TRUSTSTORE);
+ }
+
+ public static String getServerKeystorePassword(Configuration conf)
+ {
+ return conf.get(SERVER_KEYSTORE_PASSWORD);
+ }
+
+ public static String getServerTruststorePassword(Configuration conf)
+ {
+ return conf.get(SERVER_TRUSTSTORE_PASSWORD);
+ }
+
+ public static String getServerCipherSuites(Configuration conf)
+ {
+ return conf.get(SERVER_CIPHER_SUITES);
+ }
+
public static String getColumnFamilySchema(Configuration conf, String columnFamily)
{
String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e60a240..ced8aa9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -19,13 +19,16 @@ package org.apache.cassandra.hadoop.cql3;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.Set;
import java.util.UUID;
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
import org.apache.cassandra.hadoop.BulkRecordWriter;
@@ -35,6 +38,9 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
import org.apache.cassandra.io.sstable.SSTableLoader;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.tools.BulkLoader;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
@@ -108,10 +114,7 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
}
if (loader == null)
{
- ExternalClient externalClient = new ExternalClient(conf);
-
- externalClient.addKnownCfs(keyspace, schema);
-
+ BulkLoader.ExternalClient externalClient = getExternalClient(conf);
this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
@Override
public void onSuccess(StreamState finalState)
@@ -171,41 +174,53 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
return dir;
}
-
- public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
- {
- private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
-
- public ExternalClient(Configuration conf)
- {
- super(conf);
- }
- public void addKnownCfs(String keyspace, String cql)
+ private BulkLoader.ExternalClient getExternalClient(Configuration conf)
+ {
+ Set<InetAddress> hosts = new HashSet<InetAddress>();
+ String outputAddress = ConfigHelper.getOutputInitialAddress(conf);
+ if (outputAddress == null) outputAddress = "localhost";
+ String[] nodes = outputAddress.split(",");
+ for (String node : nodes)
{
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-
- if (cfs == null)
+ try
{
- cfs = new HashMap<>();
- knownCqlCfs.put(keyspace, cfs);
+ hosts.add(InetAddress.getByName(node));
}
-
- CFMetaData metadata = CFMetaData.compile(cql, keyspace);
- cfs.put(metadata.cfName, metadata);
- }
-
- @Override
- public CFMetaData getCFMetaData(String keyspace, String cfName)
- {
- CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
- if (metadata != null)
+ catch (UnknownHostException e)
{
- return metadata;
+ throw new RuntimeException(e);
}
-
- Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
- return cfs != null ? cfs.get(cfName) : null;
}
+ int rpcPort = ConfigHelper.getOutputRpcPort(conf);
+ String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+ String password = ConfigHelper.getOutputKeyspacePassword(conf);
+ ITransportFactory transportFactory = ConfigHelper.getClientTransportFactory(conf);
+ return new BulkLoader.ExternalClient(hosts,
+ rpcPort,
+ username,
+ password,
+ transportFactory,
+ CqlBulkOutputFormat.getStoragePort(conf),
+ CqlBulkOutputFormat.getSSLStoragePort(conf),
+ getServerEncryptOpt(conf));
+ }
+
+ private ServerEncryptionOptions getServerEncryptOpt(Configuration conf)
+ {
+ ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions();
+ String internodeEncrypt = CqlBulkOutputFormat.getInternodeEncryption(conf);
+ if (StringUtils.isEmpty(internodeEncrypt))
+ return encryptOpt;
+
+ encryptOpt.internode_encryption = EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt);
+ encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf);
+ encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf);
+ encryptOpt.keystore_password = CqlBulkOutputFormat.getServerKeystorePassword(conf);
+ encryptOpt.truststore_password = CqlBulkOutputFormat.getServerTruststorePassword(conf);
+ String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf);
+ if (!StringUtils.isEmpty(cipherSuites))
+ encryptOpt.cipher_suites = cipherSuites.replace(" ", "").split(",");
+ return encryptOpt;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..5287bf5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -24,18 +24,21 @@ import java.util.*;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
import org.apache.cassandra.db.BufferCell;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.*;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.mapreduce.*;
import org.apache.pig.Expression;
import org.apache.pig.ResourceSchema;
@@ -54,6 +57,7 @@ import com.datastax.driver.core.Row;
public class CqlNativeStorage extends AbstractCassandraStorage
{
private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
+ public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
private int pageSize = 1000;
private String columns;
private String outputQuery;
@@ -83,6 +87,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage
private String nativeSSLCipherSuites;
private String inputCql;
+ private boolean bulkOutputFormat = false;
+ private String bulkCfSchema;
+ private String bulkInsertStatement;
+ private String bulkOutputLocation;
+ private int bulkBuffSize = -1;
+ private int bulkStreamThrottle = -1;
+ private int bulkMaxFailedHosts = -1;
+ private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
+ private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
+ private String serverKeystore;
+ private String serverKeystorePassword;
+ private String serverTruststore;
+ private String serverTruststorePassword;
+ private String serverCipherSuites;
+ private String internodeEncrypt;
+
public CqlNativeStorage()
{
this(1000);
@@ -386,57 +406,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return keys;
}
-
- /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
- public void putNext(Tuple t) throws IOException
- {
- if (t.size() < 1)
- {
- // simply nothing here, we can't even delete without a key
- logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
- return;
- }
-
- if (t.getType(0) == DataType.TUPLE)
- {
- if (t.getType(1) == DataType.TUPLE)
- {
- Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
- cqlQueryFromTuple(key, t, 1);
- }
- else
- throw new IOException("Second argument in output must be a tuple");
- }
- else
- throw new IOException("First argument in output must be a tuple");
- }
-
/** convert key tuple to key map */
private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
{
Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
for (int i = 0; i < t.size(); i++)
{
- if (t.getType(i) == DataType.TUPLE)
- {
- Tuple inner = (Tuple) t.get(i);
- if (inner.size() == 2)
- {
- Object name = inner.get(0);
- if (name != null)
- {
- keys.put(name.toString(), objToBB(inner.get(1)));
- }
- else
- throw new IOException("Key name was empty");
- }
- else
- throw new IOException("Keys were not in name and value pairs");
- }
- else
- {
+ if (t.getType(i) != DataType.TUPLE)
throw new IOException("keys was not a tuple");
- }
+
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() != 2)
+ throw new IOException("Keys were not in name and value pairs");
+ Object name = inner.get(0);
+ if (name == null)
+ throw new IOException("Key name was empty");
+ keys.put(name.toString(), objToBB(inner.get(1)));
}
return keys;
}
@@ -446,21 +431,16 @@ public class CqlNativeStorage extends AbstractCassandraStorage
{
for (int i = offset; i < t.size(); i++)
{
- if (t.getType(i) == DataType.TUPLE)
- {
- Tuple inner = (Tuple) t.get(i);
- if (inner.size() > 0)
- {
- List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
- if (bindedVariables.size() > 0)
- sendCqlQuery(key, bindedVariables);
- else
- throw new IOException("Missing binded variables");
- }
- }
- else
- {
+ if (t.getType(i) != DataType.TUPLE)
throw new IOException("Output type was not a tuple");
+
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() > 0)
+ {
+ List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+ if (bindedVariables.size() <= 0)
+ throw new IOException("Missing binded variables");
+ sendCqlQuery(key, bindedVariables);
}
}
}
@@ -561,6 +541,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage
return property.getProperty(PARTITION_FILTER_SIGNATURE);
}
+ /**
+ * output: (((name, value), (name, value)), (value ... value), (value...value))
+ * bulk output: ((value ... value), (value...value))
+ *
+ * */
+ public void putNext(Tuple t) throws IOException
+ {
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+
+ if (t.getType(0) != DataType.TUPLE)
+ throw new IOException("First argument in output must be a tuple");
+
+ if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+ throw new IOException("Second argument in output must be a tuple");
+
+ if (bulkOutputFormat)
+ {
+ cqlQueryFromTuple(null, t, 0);
+ }
+ else
+ {
+ Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+ cqlQueryFromTuple(key, t, 1);
+ }
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@ -688,6 +699,42 @@ public class CqlNativeStorage extends AbstractCassandraStorage
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
CqlConfigHelper.setOutputCql(conf, outputQuery);
+ if (bulkOutputFormat)
+ {
+ DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+ if (bulkCfSchema != null)
+ CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema);
+ else
+ throw new IOException("bulk_cf_schema is missing in input url parameter");
+ if (bulkInsertStatement != null)
+ CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement);
+ else
+ throw new IOException("bulk_insert_statement is missing in input url parameter");
+ if (bulkOutputLocation != null)
+ conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
+ if (bulkBuffSize > 0)
+ conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
+ if (bulkStreamThrottle > 0)
+ conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
+ if (bulkMaxFailedHosts > 0)
+ conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
+ CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
+ CqlBulkOutputFormat.setStoragePort(conf, storagePort);
+ if (serverEncrypted())
+ {
+ if (!StringUtils.isEmpty(serverKeystore))
+ CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore);
+ if (!StringUtils.isEmpty(serverTruststore))
+ CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore);
+ if (!StringUtils.isEmpty(serverKeystorePassword))
+ CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword);
+ if (!StringUtils.isEmpty(serverTruststorePassword))
+ CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword);
+ if (!StringUtils.isEmpty(serverCipherSuites))
+ CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites);
+ }
+ }
+
setConnectionInformation();
if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@ -700,6 +747,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage
initSchema(storeSignature);
}
+ private boolean serverEncrypted()
+ {
+ return !StringUtils.isEmpty(internodeEncrypt) &&
+ InternodeEncryption.none != InternodeEncryption.valueOf(internodeEncrypt.toLowerCase());
+ }
+
private void setLocationFromUri(String location) throws IOException
{
try
@@ -720,6 +773,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage
if (urlQuery.containsKey("output_query"))
outputQuery = urlQuery.get("output_query");
+ if (urlQuery.containsKey("bulk_output_format"))
+ bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format"));
+ if (urlQuery.containsKey("bulk_cf_schema"))
+ bulkCfSchema = urlQuery.get("bulk_cf_schema");
+ if (urlQuery.containsKey("bulk_insert_statement"))
+ bulkInsertStatement = urlQuery.get("bulk_insert_statement");
+ if (urlQuery.containsKey("bulk_output_location"))
+ bulkOutputLocation = urlQuery.get("bulk_output_location");
+ if (urlQuery.containsKey("bulk_buff_size"))
+ bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size"));
+ if (urlQuery.containsKey("bulk_stream_throttle"))
+ bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+ if (urlQuery.containsKey("bulk_max_failed_hosts"))
+ bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
+ if (urlQuery.containsKey("storage_port"))
+ storagePort = Integer.valueOf(urlQuery.get("storage_port"));
+ if (urlQuery.containsKey("ssl_storage_port"))
+ sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port"));
+ if (urlQuery.containsKey("internode_encrypt"))
+ internodeEncrypt = urlQuery.get("internode_encrypt");
+ if (urlQuery.containsKey("server_keystore"))
+ serverKeystore = urlQuery.get("server_keystore");
+ if (urlQuery.containsKey("server_truststore"))
+ serverTruststore = urlQuery.get("server_truststore");
+ if (urlQuery.containsKey("server_keystore_pass"))
+ serverKeystorePassword = urlQuery.get("server_keystore_pass");
+ if (urlQuery.containsKey("server_truststore_pass"))
+ serverTruststorePassword = urlQuery.get("server_truststore_pass");
+ if (urlQuery.containsKey("server_cipher_suites"))
+ serverCipherSuites = urlQuery.get("server_cipher_suites");
+
//split size
if (urlQuery.containsKey("split_size"))
splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@ -804,8 +888,15 @@ public class CqlNativeStorage extends AbstractCassandraStorage
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
"[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
- "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
- }
+ "[columns=<columns>][where_clause=<where_clause>]" +
+ "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" +
+ "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" +
+ "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
+ "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" +
+ "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" +
+ "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" +
+ "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " + e.getMessage());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index c7277fa..66583ec 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -38,4 +38,3 @@ public class CqlStorage extends CqlNativeStorage
super(pageSize);
}
}
-
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 88a4404..f4b30cb 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -254,7 +254,7 @@ public class BulkLoader
}
}
- static class ExternalClient extends SSTableLoader.Client
+ public static class ExternalClient extends SSTableLoader.Client
{
private final Map<String, CFMetaData> knownCfs = new HashMap<>();
private final Set<InetAddress> hosts;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ec988e2..7be72dd 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -10,6 +10,7 @@ commitlog_segment_size_in_mb: 5
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
storage_port: 7010
+ssl_storage_port: 7011
rpc_port: 9170
start_native_transport: true
native_transport_port: 9042
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 4ca043d..2e1758e 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -53,6 +53,10 @@ public class CqlTableTest extends PigTestBase
"CREATE INDEX test_b on test (b);",
"CREATE TABLE moredata (x int PRIMARY KEY, y int);",
+ "CREATE TABLE test_bulk (a int PRIMARY KEY, b int);",
+ "INSERT INTO test_bulk (a,b) VALUES (1,1);",
+ "INSERT INTO test_bulk (a,b) VALUES (2,2);",
+ "INSERT INTO test_bulk (a,b) VALUES (3,3);",
"INSERT INTO test (a,b) VALUES (1,1);",
"INSERT INTO test (a,b) VALUES (2,2);",
"INSERT INTO test (a,b) VALUES (3,3);",
@@ -160,10 +164,13 @@ public class CqlTableTest extends PigTestBase
//input_cql=select * from test where token(a) > ? and token(a) <= ?
pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
Iterator<Tuple> it = pig.openIterator("result");
+ int count = 0;
while (it.hasNext()) {
Tuple t = it.next();
Assert.assertEquals(t.get(0), t.get(1));
+ count ++;
}
+ Assert.assertEquals(6, count);
}
@Test
@@ -310,4 +317,33 @@ public class CqlTableTest extends PigTestBase
Assert.fail("Can't fetch any data");
}
}
+
+ @Test
+ public void testCqlStorageSingleKeyTableBulkLoad()
+ throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+ {
+ pig.setBatchOn();
+ //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+ pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);");
+ pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();");
+ pig.executeBatch();
+
+ //(5,5)
+ //(6,6)
+ //(4,4)
+ //(2,2)
+ //(3,3)
+ //(1,1)
+ //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ?
+ pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ Iterator<Tuple> it = pig.openIterator("result");
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), t.get(1));
+ count ++;
+ }
+ Assert.assertEquals(6, count);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index 4b3e422..e6964f8 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -65,7 +65,8 @@ public class PigTestBase extends SchemaLoader
protected static Configuration conf;
protected static MiniCluster cluster;
protected static PigServer pig;
- protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+ protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" +
+ "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" +
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4a1e6a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4a1e6a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4a1e6a
Branch: refs/heads/cassandra-2.2
Commit: cd4a1e6acc51b0f708127a50d37dd7832bbe8dfa
Parents: 9791796 c7b4073
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Sep 15 16:12:45 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Sep 15 16:12:45 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop/AbstractColumnFamilyInputFormat.java | 2 +
.../hadoop/cql3/CqlBulkRecordWriter.java | 13 +-
.../cassandra/hadoop/pig/CqlNativeStorage.java | 171 ++++++++++++-------
test/conf/cassandra_pig.yaml | 41 +++++
.../org/apache/cassandra/pig/CqlTableTest.java | 35 ++++
.../org/apache/cassandra/pig/PigTestBase.java | 3 +-
7 files changed, 205 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bd24781,5f11049..b0ade42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,8 +1,18 @@@
-2.1.10
+2.2.2
+ * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
+ * Cancel transaction for sstables we wont redistribute index summary
+ for (CASSANDRA-10270)
+ * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209)
+ * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
+ * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
+ * Fix repair hang when snapshot failed (CASSANDRA-10057)
+ * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
+ (CASSANDRA-10199)
+Merged from 2.1:
+ * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
* BATCH statement is broken in cqlsh (CASSANDRA-10272)
* Added configurable warning threshold for GC duration (CASSANDRA-8907)
- * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)
+ * (cqlsh) Make cqlsh PEP8 Compliant (CASSANDRA-10066)
* (cqlsh) Fix error when starting cqlsh with --debug (CASSANDRA-10282)
* Scrub, Cleanup and Upgrade do not unmark compacting until all operations
have completed, regardless of the occurence of exceptions (CASSANDRA-10274)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 4dd53ff,e8de0f2..9c45bfe
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@@ -24,23 -32,31 +24,24 @@@ import java.util.concurrent.*
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.auth.IAuthenticator;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
++
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.cassandra.thrift.CfSplit;
-import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.hadoop.cql3.*;
import org.apache.cassandra.thrift.KeyRange;
-import org.apache.cassandra.thrift.TokenRange;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
+import org.apache.hadoop.mapreduce.*;
public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
{
@@@ -226,64 -271,83 +227,65 @@@
}
}
- private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+ private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
{
- int splitsize = ConfigHelper.getInputSplitSize(conf);
- for (int i = 0; i < range.rpc_endpoints.size(); i++)
+ int splitSize = ConfigHelper.getInputSplitSize(conf);
+ try
{
- String host = range.rpc_endpoints.get(i);
-
- if (host == null || host.equals("0.0.0.0"))
- host = range.endpoints.get(i);
-
- try
- {
- Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
- client.set_keyspace(keyspace);
-
- try
- {
- return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
- }
- catch (TApplicationException e)
- {
- // fallback to guessing split size if talking to a server without describe_splits_ex method
- if (e.getType() == TApplicationException.UNKNOWN_METHOD)
- {
- List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
- return tokenListToSplits(splitPoints, splitsize);
- }
- throw e;
- }
- }
- catch (IOException e)
- {
- logger.debug("failed connect to endpoint {}", host, e);
- }
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
+ return describeSplits(keyspace, cfName, range, splitSize);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
}
- throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
}
- private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+ private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
{
- List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
- for (int j = 0; j < splitTokens.size() - 1; j++)
- splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
- return splits;
+ try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
+ {
+ Map<TokenRange, Set<Host>> map = new HashMap<>();
+ Metadata metadata = session.getCluster().getMetadata();
+ for (TokenRange tokenRange : metadata.getTokenRanges())
+ map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
+ return map;
+ }
}
- private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+ private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
{
- Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
- List<TokenRange> map;
- try
+ String query = String.format("SELECT mean_partition_size, partitions_count " +
+ "FROM %s.%s " +
+ "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+ SystemKeyspace.NAME,
+ SystemKeyspace.SIZE_ESTIMATES);
+
+ ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+ Row row = resultSet.one();
+ // If we have no data on this split, return the full split i.e., do not sub-split
+ // Assume smallest granularity of partition count available from CASSANDRA-7688
+ if (row == null)
{
- map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
+ Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+ wrappedTokenRange.put(tokenRange, (long) 128);
+ return wrappedTokenRange;
}
- catch (InvalidRequestException e)
- {
- throw new RuntimeException(e);
- }
- catch (TException e)
- {
- throw new RuntimeException(e);
- }
- return map;
+
+ long meanPartitionSize = row.getLong("mean_partition_size");
+ long partitionCount = row.getLong("partitions_count");
+
+ int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
++ if (splitCount <= 0) splitCount = 1;
+ List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+ Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+ for (TokenRange range : splitRanges)
+ rangesWithLength.put(range, partitionCount/splitCount);
+
+ return rangesWithLength;
}
- //
// Old Hadoop API
- //
public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
{
TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e77c4c8,ced8aa9..9e6e23b
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@@ -23,16 -22,15 +23,17 @@@ import java.io.IOException
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
-import org.apache.cassandra.config.EncryptionOptions;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
import org.apache.cassandra.hadoop.BulkRecordWriter;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
@@@ -84,6 -67,6 +85,7 @@@ public class CqlBulkRecordWriter extend
private String insertStatement;
private File outputDir;
private boolean deleteSrc;
++ private IPartitioner partitioner;
CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
{
@@@ -113,55 -90,45 +115,64 @@@
{
// if anything is missing, exceptions will be thrown here, instead of on write()
keyspace = ConfigHelper.getOutputKeyspace(conf);
- columnFamily = ConfigHelper.getOutputColumnFamily(conf);
- schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
- insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
- outputDir = getColumnFamilyDirectory();
+ table = ConfigHelper.getOutputColumnFamily(conf);
+
+ // check if table is aliased
+ String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
+ if (aliasedCf != null)
+ table = aliasedCf;
+
+ schema = CqlBulkOutputFormat.getTableSchema(conf, table);
+ insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
+ outputDir = getTableDirectory();
deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
++ try
++ {
++ partitioner = ConfigHelper.getInputPartitioner(conf);
++ }
++ catch (Exception e)
++ {
++ partitioner = Murmur3Partitioner.instance;
++ }
+ }
+
+ protected String getOutputLocation() throws IOException
+ {
+ String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+ if (dir == null)
+ throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+ return dir;
}
-
private void prepareWriter() throws IOException
{
- try
+ if (writer == null)
{
- if (writer == null)
- {
- writer = CQLSSTableWriter.builder()
- .forTable(schema)
- .using(insertStatement)
- .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
- .inDirectory(outputDir)
- .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
- .build();
- }
- if (loader == null)
- {
- BulkLoader.ExternalClient externalClient = getExternalClient(conf);
- this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
- @Override
- public void onSuccess(StreamState finalState)
- {
- if (deleteSrc)
- FileUtils.deleteRecursive(outputDir);
- }
- };
- }
+ writer = CQLSSTableWriter.builder()
+ .forTable(schema)
+ .using(insertStatement)
+ .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+ .inDirectory(outputDir)
+ .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
++ .withPartitioner(partitioner)
+ .build();
}
- catch (Exception e)
+
+ if (loader == null)
{
- throw new IOException(e);
- }
+ ExternalClient externalClient = new ExternalClient(conf);
+ externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
+
+ loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
+ {
+ @Override
+ public void onSuccess(StreamState finalState)
+ {
+ if (deleteSrc)
+ FileUtils.deleteRecursive(outputDir);
+ }
+ };
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index dc3c174,5287bf5..223a848
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@@ -17,71 -17,47 +17,75 @@@
*/
package org.apache.cassandra.hadoop.pig;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
import java.util.*;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
-import org.apache.cassandra.db.BufferCell;
-import org.apache.cassandra.db.Cell;
-import org.apache.cassandra.db.composites.CellNames;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
++
import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AuthenticationException;
import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
+import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.hadoop.ConfigHelper;
import org.apache.cassandra.hadoop.HadoopCompat;
+ import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
++import org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter;
import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.serializers.CollectionSerializer;
import org.apache.cassandra.utils.*;
-import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.*;
import org.apache.pig.Expression.OpType;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.*;
import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
-import com.datastax.driver.core.Row;
-public class CqlNativeStorage extends AbstractCassandraStorage
+public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
{
+ protected String DEFAULT_INPUT_FORMAT;
+ protected String DEFAULT_OUTPUT_FORMAT;
+
+ protected String username;
+ protected String password;
+ protected String keyspace;
+ protected String column_family;
+ protected String loadSignature;
+ protected String storeSignature;
+
+ protected Configuration conf;
+ protected String inputFormatClass;
+ protected String outputFormatClass;
+ protected int splitSize = 64 * 1024;
+ protected String partitionerClass;
+ protected boolean usePartitionFilter = false;
+ protected String initHostAddress;
+ protected String rpcPort;
+ protected int nativeProtocolVersion = 1;
+
private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
- public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
++ private static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
private int pageSize = 1000;
private String columns;
private String outputQuery;
@@@ -109,6 -87,22 +113,16 @@@
private String nativeSSLCipherSuites;
private String inputCql;
+ private boolean bulkOutputFormat = false;
+ private String bulkCfSchema;
+ private String bulkInsertStatement;
+ private String bulkOutputLocation;
+ private int bulkBuffSize = -1;
+ private int bulkStreamThrottle = -1;
+ private int bulkMaxFailedHosts = -1;
- private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
- private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
- private String serverKeystore;
- private String serverKeystorePassword;
- private String serverTruststore;
- private String serverTruststorePassword;
- private String serverCipherSuites;
- private String internodeEncrypt;
++ private boolean bulkDeleteSourceOnSuccess = true;
++ private String bulkTableAlias;
+
public CqlNativeStorage()
{
this(1000);
@@@ -241,83 -240,188 +255,48 @@@
return obj;
}
- /** include key columns */
- protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
- throws InvalidRequestException,
- UnavailableException,
- TimedOutException,
- SchemaDisagreementException,
- TException,
- CharacterCodingException,
- org.apache.cassandra.exceptions.InvalidRequestException,
- ConfigurationException,
- NotFoundException
- {
- List<ColumnDef> keyColumns = null;
- // get key columns
+ /** get the columnfamily definition for the signature */
+ protected TableInfo getCfInfo(String signature) throws IOException
+ {
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ TableInfo cfInfo;
try
{
- keyColumns = getKeysMeta(client);
+ cfInfo = cfdefFromString(property.getProperty(signature));
}
- catch(Exception e)
+ catch (ClassNotFoundException e)
{
- logger.error("Error in retrieving key columns" , e);
+ throw new IOException(e);
}
-
- // get other columns
- List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
- // combine all columns in a list
- if (keyColumns != null && columns != null)
- keyColumns.addAll(columns);
-
- return keyColumns;
+ return cfInfo;
}
- /** get keys meta data */
- private List<ColumnDef> getKeysMeta(Cassandra.Client client)
- throws Exception
+ /** return the CfInfo for the column family */
+ protected TableMetadata getCfInfo(Session client)
+ throws NoHostAvailableException,
+ AuthenticationException,
+ IllegalStateException
{
- String query = "SELECT key_aliases, " +
- " column_aliases, " +
- " key_validator, " +
- " comparator, " +
- " keyspace_name, " +
- " value_alias, " +
- " default_validator " +
- "FROM system.schema_columnfamilies " +
- "WHERE keyspace_name = '%s'" +
- " AND columnfamily_name = '%s' ";
-
- CqlResult result = client.execute_cql3_query(
- ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
- Compression.NONE,
- ConsistencyLevel.ONE);
-
- if (result == null || result.rows == null || result.rows.isEmpty())
- return null;
-
- Iterator<CqlRow> iteraRow = result.rows.iterator();
- List<ColumnDef> keys = new ArrayList<ColumnDef>();
- if (iteraRow.hasNext())
- {
- CqlRow cqlRow = iteraRow.next();
- String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
- logger.debug("Found ksDef name: {}", name);
- String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
- logger.debug("partition keys: {}", keyString);
- List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
- Iterator<String> iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
- // classic thrift tables
- if (keys.size() == 0)
- {
- CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
- for (ColumnDefinition def : cfm.partitionKeyColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- for (ColumnDefinition def : cfm.clusteringColumns())
- {
- String key = def.name.toString();
- logger.debug("name: {} ", key);
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(key);
- keys.add(cDef);
- }
- }
-
- keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
- logger.debug("cluster keys: {}", keyString);
- keyNames = FBUtilities.fromJsonList(keyString);
-
- iterator = keyNames.iterator();
- while (iterator.hasNext())
- {
- ColumnDef cDef = new ColumnDef();
- cDef.name = ByteBufferUtil.bytes(iterator.next());
- keys.add(cDef);
- }
-
- String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
- logger.debug("row key validator: {}", validator);
- AbstractType<?> keyValidator = parseType(validator);
-
- Iterator<ColumnDef> keyItera = keys.iterator();
- if (keyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
- while (typeItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = keyValidator.toString();
-
- validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
- logger.debug("cluster key validator: {}", validator);
-
- if (keyItera.hasNext() && validator != null && !validator.isEmpty())
- {
- AbstractType<?> clusterKeyValidator = parseType(validator);
-
- if (clusterKeyValidator instanceof CompositeType)
- {
- Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
- while (keyItera.hasNext())
- keyItera.next().validation_class = typeItera.next().toString();
- }
- else
- keyItera.next().validation_class = clusterKeyValidator.toString();
- }
-
- // compact value_alias column
- if (cqlRow.columns.get(5).value != null)
- {
- try
- {
- String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
- logger.debug("default validator: {}", compactValidator);
- AbstractType<?> defaultValidator = parseType(compactValidator);
-
- ColumnDef cDef = new ColumnDef();
- cDef.name = cqlRow.columns.get(5).value;
- cDef.validation_class = defaultValidator.toString();
- keys.add(cDef);
- hasCompactValueAlias = true;
- }
- catch (Exception e)
- {
- // no compact column at value_alias
- }
- }
-
- }
- return keys;
+ // get CF meta data
+ return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
}
- /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
- public void putNext(Tuple t) throws IOException
- {
- if (t.size() < 1)
- {
- // simply nothing here, we can't even delete without a key
- logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
- return;
- }
-
- if (t.getType(0) == DataType.TUPLE)
- {
- if (t.getType(1) == DataType.TUPLE)
- {
- Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
- cqlQueryFromTuple(key, t, 1);
- }
- else
- throw new IOException("Second argument in output must be a tuple");
- }
- else
- throw new IOException("First argument in output must be a tuple");
- }
-
/** convert key tuple to key map */
private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
{
Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
for (int i = 0; i < t.size(); i++)
{
- if (t.getType(i) == DataType.TUPLE)
- {
- Tuple inner = (Tuple) t.get(i);
- if (inner.size() == 2)
- {
- Object name = inner.get(0);
- if (name != null)
- {
- keys.put(name.toString(), objToBB(inner.get(1)));
- }
- else
- throw new IOException("Key name was empty");
- }
- else
- throw new IOException("Keys were not in name and value pairs");
- }
- else
- {
+ if (t.getType(i) != DataType.TUPLE)
throw new IOException("keys was not a tuple");
- }
-
+ Tuple inner = (Tuple) t.get(i);
+ if (inner.size() != 2)
+ throw new IOException("Keys were not in name and value pairs");
+ Object name = inner.get(0);
+ if (name == null)
+ throw new IOException("Key name was empty");
+ keys.put(name.toString(), objToBB(inner.get(1)));
}
return keys;
}
@@@ -543,10 -537,41 +517,41 @@@
private String getWhereClauseForPartitionFilter()
{
UDFContext context = UDFContext.getUDFContext();
- Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
- return property.getProperty(PARTITION_FILTER_SIGNATURE);
+ Properties property = context.getUDFProperties(CqlNativeStorage.class);
+ return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
}
+ /**
+ * output: (((name, value), (name, value)), (value ... value), (value...value))
+ * bulk output: ((value ... value), (value...value))
+ *
+ * */
+ public void putNext(Tuple t) throws IOException
+ {
+ if (t.size() < 1)
+ {
+ // simply nothing here, we can't even delete without a key
+ logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+ return;
+ }
+
+ if (t.getType(0) != DataType.TUPLE)
+ throw new IOException("First argument in output must be a tuple");
+
+ if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+ throw new IOException("Second argument in output must be a tuple");
+
+ if (bulkOutputFormat)
+ {
+ cqlQueryFromTuple(null, t, 0);
+ }
+ else
+ {
+ Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+ cqlQueryFromTuple(key, t, 1);
+ }
+ }
+
/** set read configuration settings */
public void setLocation(String location, Job job) throws IOException
{
@@@ -672,6 -699,42 +677,32 @@@
ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
CqlConfigHelper.setOutputCql(conf, outputQuery);
+ if (bulkOutputFormat)
+ {
+ DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+ if (bulkCfSchema != null)
- CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema);
++ CqlBulkOutputFormat.setTableSchema(conf, column_family, bulkCfSchema);
+ else
+ throw new IOException("bulk_cf_schema is missing in input url parameter");
+ if (bulkInsertStatement != null)
- CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement);
++ CqlBulkOutputFormat.setTableInsertStatement(conf, column_family, bulkInsertStatement);
+ else
+ throw new IOException("bulk_insert_statement is missing in input url parameter");
++ if (bulkTableAlias != null)
++ CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family);
++ CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
+ if (bulkOutputLocation != null)
- conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
++ conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
+ if (bulkBuffSize > 0)
- conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
++ conf.set(CqlBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
+ if (bulkStreamThrottle > 0)
- conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
++ conf.set(CqlBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
+ if (bulkMaxFailedHosts > 0)
- conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
- CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
- CqlBulkOutputFormat.setStoragePort(conf, storagePort);
- if (serverEncrypted())
- {
- if (!StringUtils.isEmpty(serverKeystore))
- CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore);
- if (!StringUtils.isEmpty(serverTruststore))
- CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore);
- if (!StringUtils.isEmpty(serverKeystorePassword))
- CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword);
- if (!StringUtils.isEmpty(serverTruststorePassword))
- CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword);
- if (!StringUtils.isEmpty(serverCipherSuites))
- CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites);
- }
++ conf.set(CqlBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
++ if (partitionerClass!= null)
++ ConfigHelper.setInputPartitioner(conf, partitionerClass);
+ }
+
setConnectionInformation();
if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@@ -773,6 -773,37 +804,25 @@@
if (urlQuery.containsKey("output_query"))
outputQuery = urlQuery.get("output_query");
+ if (urlQuery.containsKey("bulk_output_format"))
+ bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format"));
+ if (urlQuery.containsKey("bulk_cf_schema"))
+ bulkCfSchema = urlQuery.get("bulk_cf_schema");
+ if (urlQuery.containsKey("bulk_insert_statement"))
+ bulkInsertStatement = urlQuery.get("bulk_insert_statement");
+ if (urlQuery.containsKey("bulk_output_location"))
+ bulkOutputLocation = urlQuery.get("bulk_output_location");
+ if (urlQuery.containsKey("bulk_buff_size"))
+ bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size"));
+ if (urlQuery.containsKey("bulk_stream_throttle"))
+ bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+ if (urlQuery.containsKey("bulk_max_failed_hosts"))
+ bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
- if (urlQuery.containsKey("storage_port"))
- storagePort = Integer.valueOf(urlQuery.get("storage_port"));
- if (urlQuery.containsKey("ssl_storage_port"))
- sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port"));
- if (urlQuery.containsKey("internode_encrypt"))
- internodeEncrypt = urlQuery.get("internode_encrypt");
- if (urlQuery.containsKey("server_keystore"))
- serverKeystore = urlQuery.get("server_keystore");
- if (urlQuery.containsKey("server_truststore"))
- serverTruststore = urlQuery.get("server_truststore");
- if (urlQuery.containsKey("server_keystore_pass"))
- serverKeystorePassword = urlQuery.get("server_keystore_pass");
- if (urlQuery.containsKey("server_truststore_pass"))
- serverTruststorePassword = urlQuery.get("server_truststore_pass");
- if (urlQuery.containsKey("server_cipher_suites"))
- serverCipherSuites = urlQuery.get("server_cipher_suites");
++ if (urlQuery.containsKey("bulk_delete_source"))
++ bulkDeleteSourceOnSuccess = Boolean.parseBoolean(urlQuery.get("bulk_delete_source"));
++ if (urlQuery.containsKey("bulk_table_alias"))
++ bulkTableAlias = urlQuery.get("bulk_table_alias");
+
//split size
if (urlQuery.containsKey("split_size"))
splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@@ -855,10 -888,20 +905,13 @@@
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
"[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
- "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
- }
+ "[columns=<columns>][where_clause=<where_clause>]" +
- "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" +
- "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" +
- "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
- "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" +
- "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" +
- "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" +
++ "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement][&bulk_table_alias=<bulk_table_alias>]" +
++ "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>][&bulk_delete_source=<bulk_delete_source>]" +
+ "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " + e.getMessage());
+ }
}
- /**
- * Thrift API can't handle null, so use empty byte array
- */
public ByteBuffer nullToBB()
{
return ByteBuffer.wrap(new byte[0]);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/conf/cassandra_pig.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra_pig.yaml
index 0000000,0000000..68615cf
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra_pig.yaml
@@@ -1,0 -1,0 +1,41 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++partitioner: org.apache.cassandra.dht.Murmur3Partitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++rpc_port: 9170
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++ - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++ - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++ parameters:
++ - seeds: "127.0.0.1"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
++request_scheduler_id: keyspace
++server_encryption_options:
++ internode_encryption: none
++ keystore: conf/.keystore
++ keystore_password: cassandra
++ truststore: conf/.truststore
++ truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 72fdd5a,2e1758e..3902fce
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@@ -265,4 -317,33 +272,32 @@@ public class CqlTableTest extends PigTe
Assert.fail("Can't fetch any data");
}
}
+
+ @Test
- public void testCqlStorageSingleKeyTableBulkLoad()
- throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
++ public void testCqlStorageSingleKeyTableBulkLoad() throws TException, IOException
+ {
+ pig.setBatchOn();
+ //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+ pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);");
+ pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();");
+ pig.executeBatch();
+
+ //(5,5)
+ //(6,6)
+ //(4,4)
+ //(2,2)
+ //(3,3)
+ //(1,1)
+ //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ?
+ pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+ Iterator<Tuple> it = pig.openIterator("result");
+ int count = 0;
+ while (it.hasNext()) {
+ Tuple t = it.next();
+ Assert.assertEquals(t.get(0), t.get(1));
+ count ++;
+ }
+ Assert.assertEquals(6, count);
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/PigTestBase.java
index 8c27f6c,e6964f8..a8a9de5
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@@ -54,7 -65,8 +54,7 @@@ public class PigTestBase extends Schema
protected static Configuration conf;
protected static MiniCluster cluster;
protected static PigServer pig;
- protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
- protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" +
- "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
++ protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.Murmur3Partitioner";
protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000" +
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
@@@ -62,6 -74,6 +62,7 @@@
static
{
System.setProperty("logback.configurationFile", "logback-test.xml");
++ System.setProperty("cassandra.config", "cassandra_pig.yaml");
}
@AfterClass