You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/15 17:12:56 UTC

[1/2] cassandra git commit: (Pig) support BulkOutputFormat as a URL parameter

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.2 979179609 -> cd4a1e6ac


(Pig) support BulkOutputFormat as a URL parameter

patch by Alex Liu; reviewed by Piotr Kołaczkowski for CASSANDRA-7410


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c7b40735
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c7b40735
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c7b40735

Branch: refs/heads/cassandra-2.2
Commit: c7b40735789c840529002eb3c11d8731f460d61c
Parents: ae51086
Author: Alex Liu <al...@yahoo.com>
Authored: Tue Sep 15 16:06:18 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Sep 15 16:08:54 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/cql3/CqlBulkOutputFormat.java        |  93 +++++++-
 .../hadoop/cql3/CqlBulkRecordWriter.java        |  87 ++++----
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 213 +++++++++++++------
 .../apache/cassandra/hadoop/pig/CqlStorage.java |   1 -
 .../org/apache/cassandra/tools/BulkLoader.java  |   2 +-
 test/conf/cassandra.yaml                        |   1 +
 .../org/apache/cassandra/pig/CqlTableTest.java  |  36 ++++
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +-
 9 files changed, 336 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index dff47fc..5f11049 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.10
+ * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
  * BATCH statement is broken in cqlsh (CASSANDRA-10272)
  * Added configurable warning threshold for GC duration (CASSANDRA-8907)
  * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 887fe8e..7fedb41 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.cassandra.config.EncryptionOptions;
 import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +55,16 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
     private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
     private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
     private static final String DELETE_SOURCE = "cassandra.output.delete.source";
+    private static final String OUTPUT_CQL_STORAGE_PORT = "cassandra.storage.port";
+    private static final String OUTPUT_CQL_SSL_STORAGE_PORT = "cassandra.ssl.storage.port";
+    private static final String INTERNODE_ENCRYPTION = "cassandra.internode.encryption";
+    private static final String SERVER_KEYSTORE = "cassandra.server.keystore";
+    private static final String SERVER_KEYSTORE_PASSWORD = "cassandra.server.keystore.password";
+    private static final String SERVER_TRUSTSTORE = "cassandra.server.truststore";
+    private static final String SERVER_TRUSTSTORE_PASSWORD = "cassandra.server.truststore.password";
+    private static final String SERVER_CIPHER_SUITES = "cassandra.server.truststore.password";
+    public static final int DEFAULT_STORAGE_PORT = 7000;
+    public static final int DEFAULT_SSL_STORAGE_PORT = 7001;
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -84,7 +95,87 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
     {
         conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
     }
-    
+
+    public static void setStoragePort(Configuration conf, int port)
+    {
+        conf.set(OUTPUT_CQL_STORAGE_PORT, "" + port);
+    }
+
+    public static void setSSLStoragePort(Configuration conf, int port)
+    {
+        conf.set(OUTPUT_CQL_SSL_STORAGE_PORT, "" + port);
+    }
+
+    public static void setInternodeEncryption(Configuration conf, String encrypt)
+    {
+        conf.set(INTERNODE_ENCRYPTION, encrypt);
+    }
+
+    public static void setServerKeystore(Configuration conf, String keystore)
+    {
+        conf.set(SERVER_KEYSTORE, keystore);
+    }
+
+    public static void setServerKeystorePassword(Configuration conf, String keystorePass)
+    {
+        conf.set(SERVER_KEYSTORE_PASSWORD, keystorePass);
+    }
+
+    public static void setServerTruststore(Configuration conf, String truststore)
+    {
+        conf.set(SERVER_TRUSTSTORE, truststore);
+    }
+
+    public static void setServerTruststorePassword(Configuration conf, String truststorePass)
+    {
+        conf.set(SERVER_TRUSTSTORE_PASSWORD, truststorePass);
+    }
+
+    public static void setServerCipherSuites(Configuration conf, String cipherSuites)
+    {
+        conf.set(SERVER_CIPHER_SUITES, cipherSuites);
+    }
+
+    public static int getStoragePort(Configuration conf)
+    {
+        return conf.getInt(OUTPUT_CQL_STORAGE_PORT, DEFAULT_STORAGE_PORT);
+    }
+
+    public static int getSSLStoragePort(Configuration conf)
+    {
+        return conf.getInt(OUTPUT_CQL_SSL_STORAGE_PORT, DEFAULT_SSL_STORAGE_PORT);
+    }
+
+    public static String getInternodeEncryption(Configuration conf)
+    {
+        return conf.get(INTERNODE_ENCRYPTION, EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.none.name());
+    }
+
+    public static String getServerKeystore(Configuration conf)
+    {
+        return conf.get(SERVER_KEYSTORE);
+    }
+
+    public static String getServerTruststore(Configuration conf)
+    {
+        return conf.get(SERVER_TRUSTSTORE);
+    }
+
+    public static String getServerKeystorePassword(Configuration conf)
+    {
+        return conf.get(SERVER_KEYSTORE_PASSWORD);
+    }
+
+    public static String getServerTruststorePassword(Configuration conf)
+    {
+        return conf.get(SERVER_TRUSTSTORE_PASSWORD);
+    }
+
+    public static String getServerCipherSuites(Configuration conf)
+    {
+        return conf.get(SERVER_CIPHER_SUITES);
+    }
+
     public static String getColumnFamilySchema(Configuration conf, String columnFamily)
     {
         String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e60a240..ced8aa9 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -19,13 +19,16 @@ package org.apache.cassandra.hadoop.cql3;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
-import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.EncryptionOptions;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.BulkRecordWriter;
@@ -35,6 +38,9 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.ITransportFactory;
+import org.apache.cassandra.tools.BulkLoader;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
@@ -108,10 +114,7 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
             }
             if (loader == null)
             {
-                ExternalClient externalClient = new ExternalClient(conf);
-                
-                externalClient.addKnownCfs(keyspace, schema);
-
+                BulkLoader.ExternalClient externalClient = getExternalClient(conf);
                 this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
                     @Override
                     public void onSuccess(StreamState finalState)
@@ -171,41 +174,53 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
         
         return dir;
     }
-    
-    public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
-    {
-        private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
-        
-        public ExternalClient(Configuration conf)
-        {
-            super(conf);
-        }
 
-        public void addKnownCfs(String keyspace, String cql)
+    private BulkLoader.ExternalClient getExternalClient(Configuration conf)
+    {
+        Set<InetAddress> hosts = new HashSet<InetAddress>();
+        String outputAddress = ConfigHelper.getOutputInitialAddress(conf);
+        if (outputAddress == null) outputAddress = "localhost";
+        String[] nodes = outputAddress.split(",");
+        for (String node : nodes)
         {
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            
-            if (cfs == null)
+            try
             {
-                cfs = new HashMap<>();
-                knownCqlCfs.put(keyspace, cfs);
+                hosts.add(InetAddress.getByName(node));
             }
-            
-            CFMetaData metadata = CFMetaData.compile(cql, keyspace);
-            cfs.put(metadata.cfName, metadata);
-        }
-        
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
-            if (metadata != null)
+            catch (UnknownHostException e)
             {
-                return metadata;
+                throw new RuntimeException(e);
             }
-            
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
         }
+        int rpcPort = ConfigHelper.getOutputRpcPort(conf);
+        String username = ConfigHelper.getOutputKeyspaceUserName(conf);
+        String password = ConfigHelper.getOutputKeyspacePassword(conf);
+        ITransportFactory transportFactory = ConfigHelper.getClientTransportFactory(conf);
+        return new BulkLoader.ExternalClient(hosts,
+                rpcPort,
+                username,
+                password,
+                transportFactory,
+                CqlBulkOutputFormat.getStoragePort(conf),
+                CqlBulkOutputFormat.getSSLStoragePort(conf),
+                getServerEncryptOpt(conf));
+    }
+
+    private ServerEncryptionOptions getServerEncryptOpt(Configuration conf)
+    {
+        ServerEncryptionOptions encryptOpt = new ServerEncryptionOptions();
+        String internodeEncrypt = CqlBulkOutputFormat.getInternodeEncryption(conf);
+        if (StringUtils.isEmpty(internodeEncrypt))
+            return encryptOpt;
+
+        encryptOpt.internode_encryption = EncryptionOptions.ServerEncryptionOptions.InternodeEncryption.valueOf(internodeEncrypt);
+        encryptOpt.keystore = CqlBulkOutputFormat.getServerKeystore(conf);
+        encryptOpt.truststore = CqlBulkOutputFormat.getServerTruststore(conf);
+        encryptOpt.keystore_password = CqlBulkOutputFormat.getServerKeystorePassword(conf);
+        encryptOpt.truststore_password = CqlBulkOutputFormat.getServerTruststorePassword(conf);
+        String cipherSuites = CqlBulkOutputFormat.getServerCipherSuites(conf);
+        if (!StringUtils.isEmpty(cipherSuites))
+            encryptOpt.cipher_suites = cipherSuites.replace(" ", "").split(",");
+        return encryptOpt;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..5287bf5 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -24,18 +24,21 @@ import java.util.*;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 import org.apache.cassandra.db.BufferCell;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.utils.*;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
 import org.apache.pig.ResourceSchema;
@@ -54,6 +57,7 @@ import com.datastax.driver.core.Row;
 public class CqlNativeStorage extends AbstractCassandraStorage
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
+    public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
     private int pageSize = 1000;
     private String columns;
     private String outputQuery;
@@ -83,6 +87,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage
     private String nativeSSLCipherSuites;
     private String inputCql;
 
+    private boolean bulkOutputFormat = false;
+    private String bulkCfSchema;
+    private String bulkInsertStatement;
+    private String bulkOutputLocation;
+    private int bulkBuffSize = -1;
+    private int bulkStreamThrottle = -1;
+    private int bulkMaxFailedHosts = -1;
+    private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
+    private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
+    private String serverKeystore;
+    private String serverKeystorePassword;
+    private String serverTruststore;
+    private String serverTruststorePassword;
+    private String serverCipherSuites;
+    private String internodeEncrypt;
+
     public CqlNativeStorage()
     {
         this(1000);
@@ -386,57 +406,22 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         return keys;
     }
 
-
-    /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
-    public void putNext(Tuple t) throws IOException
-    {
-        if (t.size() < 1)
-        {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-            return;
-        }
-
-        if (t.getType(0) == DataType.TUPLE)
-        {
-            if (t.getType(1) == DataType.TUPLE)
-            {
-                Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-                cqlQueryFromTuple(key, t, 1);
-            }
-            else
-                throw new IOException("Second argument in output must be a tuple");
-        }
-        else
-            throw new IOException("First argument in output must be a tuple");
-    }
-
     /** convert key tuple to key map */
     private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
     {
         Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
         for (int i = 0; i < t.size(); i++)
         {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() == 2)
-                {
-                    Object name = inner.get(0);
-                    if (name != null)
-                    {
-                        keys.put(name.toString(), objToBB(inner.get(1)));
-                    }
-                    else
-                        throw new IOException("Key name was empty");
-                }
-                else
-                    throw new IOException("Keys were not in name and value pairs");
-            }
-            else
-            {
+            if (t.getType(i) != DataType.TUPLE)
                 throw new IOException("keys was not a tuple");
-            }
+
+            Tuple inner = (Tuple) t.get(i);
+            if (inner.size() != 2)
+                throw new IOException("Keys were not in name and value pairs");
+            Object name = inner.get(0);
+            if (name == null)
+                throw new IOException("Key name was empty");
+            keys.put(name.toString(), objToBB(inner.get(1)));
         }
         return keys;
     }
@@ -446,21 +431,16 @@ public class CqlNativeStorage extends AbstractCassandraStorage
     {
         for (int i = offset; i < t.size(); i++)
         {
-            if (t.getType(i) == DataType.TUPLE)
-            {
-                Tuple inner = (Tuple) t.get(i);
-                if (inner.size() > 0)
-                {
-                    List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
-                    if (bindedVariables.size() > 0)
-                        sendCqlQuery(key, bindedVariables);
-                    else
-                        throw new IOException("Missing binded variables");
-                }
-            }
-            else
-            {
+            if (t.getType(i) != DataType.TUPLE)
                 throw new IOException("Output type was not a tuple");
+
+            Tuple inner = (Tuple) t.get(i);
+            if (inner.size() > 0)
+            {
+                List<ByteBuffer> bindedVariables = bindedVariablesFromTuple(inner);
+                if (bindedVariables.size() <= 0)
+                    throw new IOException("Missing binded variables");
+                sendCqlQuery(key, bindedVariables);
             }
         }
     }
@@ -561,6 +541,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         return property.getProperty(PARTITION_FILTER_SIGNATURE);
     }
 
+    /**
+     *  output: (((name, value), (name, value)), (value ... value), (value...value))
+     *  bulk output: ((value ... value), (value...value))
+     *
+     * */
+    public void putNext(Tuple t) throws IOException
+    {
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+            return;
+        }
+
+        if (t.getType(0) != DataType.TUPLE)
+            throw new IOException("First argument in output must be a tuple");
+
+        if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+            throw new IOException("Second argument in output must be a tuple");
+
+        if (bulkOutputFormat)
+        {
+            cqlQueryFromTuple(null, t, 0);
+        }
+        else
+        {
+            Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+            cqlQueryFromTuple(key, t, 1);
+        }
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -688,6 +699,42 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         CqlConfigHelper.setOutputCql(conf, outputQuery);
 
+        if (bulkOutputFormat)
+        {
+            DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+            if (bulkCfSchema != null)
+                CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema);
+            else
+                throw new IOException("bulk_cf_schema is missing in input url parameter");
+            if (bulkInsertStatement != null)
+                CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement);
+            else
+                throw new IOException("bulk_insert_statement is missing in input url parameter");
+            if (bulkOutputLocation != null)
+                conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
+            if (bulkBuffSize > 0)
+                conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
+            if (bulkStreamThrottle > 0)
+                conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
+            if (bulkMaxFailedHosts > 0)
+                conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
+            CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
+            CqlBulkOutputFormat.setStoragePort(conf, storagePort);
+            if (serverEncrypted())
+            {
+                if (!StringUtils.isEmpty(serverKeystore))
+                    CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore);
+                if (!StringUtils.isEmpty(serverTruststore))
+                    CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore);
+                if (!StringUtils.isEmpty(serverKeystorePassword))
+                    CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword);
+                if (!StringUtils.isEmpty(serverTruststorePassword))
+                    CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword);
+                if (!StringUtils.isEmpty(serverCipherSuites))
+                    CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites);
+            }
+        }
+
         setConnectionInformation();
 
         if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@ -700,6 +747,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    private boolean serverEncrypted()
+    {
+        return !StringUtils.isEmpty(internodeEncrypt) && 
+                InternodeEncryption.none != InternodeEncryption.valueOf(internodeEncrypt.toLowerCase());
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -720,6 +773,37 @@ public class CqlNativeStorage extends AbstractCassandraStorage
                 if (urlQuery.containsKey("output_query"))
                     outputQuery = urlQuery.get("output_query");
 
+                if (urlQuery.containsKey("bulk_output_format"))
+                    bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format"));
+                if (urlQuery.containsKey("bulk_cf_schema"))
+                    bulkCfSchema = urlQuery.get("bulk_cf_schema");
+                if (urlQuery.containsKey("bulk_insert_statement"))
+                    bulkInsertStatement = urlQuery.get("bulk_insert_statement");
+                if (urlQuery.containsKey("bulk_output_location"))
+                    bulkOutputLocation = urlQuery.get("bulk_output_location");
+                if (urlQuery.containsKey("bulk_buff_size"))
+                    bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size"));
+                if (urlQuery.containsKey("bulk_stream_throttle"))
+                    bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+                if (urlQuery.containsKey("bulk_max_failed_hosts"))
+                    bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
+                if (urlQuery.containsKey("storage_port"))
+                    storagePort = Integer.valueOf(urlQuery.get("storage_port"));
+                if (urlQuery.containsKey("ssl_storage_port"))
+                    sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port"));
+                if (urlQuery.containsKey("internode_encrypt"))
+                    internodeEncrypt = urlQuery.get("internode_encrypt");
+                if (urlQuery.containsKey("server_keystore"))
+                    serverKeystore = urlQuery.get("server_keystore");
+                if (urlQuery.containsKey("server_truststore"))
+                    serverTruststore = urlQuery.get("server_truststore");
+                if (urlQuery.containsKey("server_keystore_pass"))
+                    serverKeystorePassword = urlQuery.get("server_keystore_pass");
+                if (urlQuery.containsKey("server_truststore_pass"))
+                    serverTruststorePassword = urlQuery.get("server_truststore_pass");
+                if (urlQuery.containsKey("server_cipher_suites"))
+                    serverCipherSuites = urlQuery.get("server_cipher_suites");
+
                 //split size
                 if (urlQuery.containsKey("split_size"))
                     splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@ -804,8 +888,15 @@ public class CqlNativeStorage extends AbstractCassandraStorage
                     "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                     "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
                     "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
-                    "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
-        }
+                    "[columns=<columns>][where_clause=<where_clause>]" +
+                    "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" +
+                    "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" +
+                    "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
+                    "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" +
+                    "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" +
+                    "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" +
+                    "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " +  e.getMessage());
+         }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index c7277fa..66583ec 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -38,4 +38,3 @@ public class CqlStorage extends CqlNativeStorage
         super(pageSize);
     }
 }
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 88a4404..f4b30cb 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -254,7 +254,7 @@ public class BulkLoader
         }
     }
 
-    static class ExternalClient extends SSTableLoader.Client
+    public static class ExternalClient extends SSTableLoader.Client
     {
         private final Map<String, CFMetaData> knownCfs = new HashMap<>();
         private final Set<InetAddress> hosts;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index ec988e2..7be72dd 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -10,6 +10,7 @@ commitlog_segment_size_in_mb: 5
 partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
 listen_address: 127.0.0.1
 storage_port: 7010
+ssl_storage_port: 7011
 rpc_port: 9170
 start_native_transport: true
 native_transport_port: 9042

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 4ca043d..2e1758e 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -53,6 +53,10 @@ public class CqlTableTest extends PigTestBase
             "CREATE INDEX test_b on test (b);",
 
             "CREATE TABLE moredata (x int PRIMARY KEY, y int);",
+            "CREATE TABLE test_bulk (a int PRIMARY KEY, b int);",
+            "INSERT INTO test_bulk (a,b) VALUES (1,1);",
+            "INSERT INTO test_bulk (a,b) VALUES (2,2);",
+            "INSERT INTO test_bulk (a,b) VALUES (3,3);",
             "INSERT INTO test (a,b) VALUES (1,1);",
             "INSERT INTO test (a,b) VALUES (2,2);",
             "INSERT INTO test (a,b) VALUES (3,3);",
@@ -160,10 +164,13 @@ public class CqlTableTest extends PigTestBase
         //input_cql=select * from test where token(a) > ? and token(a) <= ?
         pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
+        int count = 0;
         while (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), t.get(1));
+            count ++;
         }
+        Assert.assertEquals(6, count);
     }
 
     @Test
@@ -310,4 +317,33 @@ public class CqlTableTest extends PigTestBase
             Assert.fail("Can't fetch any data");
         }
     }
+
+    @Test
+    public void testCqlStorageSingleKeyTableBulkLoad()
+    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
+    {
+        pig.setBatchOn();
+        //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+        pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+        pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);");
+        pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters +  "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();");
+        pig.executeBatch();
+
+        //(5,5)
+        //(6,6)
+        //(4,4)
+        //(2,2)
+        //(3,3)
+        //(1,1)
+        //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ?
+        pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+        Iterator<Tuple> it = pig.openIterator("result");
+        int count = 0;
+        while (it.hasNext()) {
+            Tuple t = it.next();
+            Assert.assertEquals(t.get(0), t.get(1));
+            count ++;
+        }
+        Assert.assertEquals(6, count);
+     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c7b40735/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index 4b3e422..e6964f8 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -65,7 +65,8 @@ public class PigTestBase extends SchemaLoader
     protected static Configuration conf;
     protected static MiniCluster cluster; 
     protected static PigServer pig;
-    protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+    protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" +
+                                               "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
     protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"  +
                                                "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
                                                "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";


[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into cassandra-2.2

Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cd4a1e6a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cd4a1e6a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cd4a1e6a

Branch: refs/heads/cassandra-2.2
Commit: cd4a1e6acc51b0f708127a50d37dd7832bbe8dfa
Parents: 9791796 c7b4073
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Tue Sep 15 16:12:45 2015 +0100
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue Sep 15 16:12:45 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/AbstractColumnFamilyInputFormat.java |   2 +
 .../hadoop/cql3/CqlBulkRecordWriter.java        |  13 +-
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 171 ++++++++++++-------
 test/conf/cassandra_pig.yaml                    |  41 +++++
 .../org/apache/cassandra/pig/CqlTableTest.java  |  35 ++++
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +-
 7 files changed, 205 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index bd24781,5f11049..b0ade42
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,17 -1,8 +1,18 @@@
 -2.1.10
 +2.2.2
 + * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761)
 + * Cancel transaction for sstables we wont redistribute index summary
 +   for (CASSANDRA-10270)
 + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) 
 + * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222)
 + * Fix failure to start with space in directory path on Windows (CASSANDRA-10239)
 + * Fix repair hang when snapshot failed (CASSANDRA-10057)
 + * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks
 +   (CASSANDRA-10199)
 +Merged from 2.1:
+  * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410)
   * BATCH statement is broken in cqlsh (CASSANDRA-10272)
   * Added configurable warning threshold for GC duration (CASSANDRA-8907)
 - * (cqlsh) Make cqlsh PEP8 compliant (CASSANDRA-10066)
 + * (cqlsh) Make cqlsh PEP8 Compliant (CASSANDRA-10066)
   * (cqlsh) Fix error when starting cqlsh with --debug (CASSANDRA-10282)
   * Scrub, Cleanup and Upgrade do not unmark compacting until all operations
     have completed, regardless of the occurence of exceptions (CASSANDRA-10274)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 4dd53ff,e8de0f2..9c45bfe
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@@ -24,23 -32,31 +24,24 @@@ import java.util.concurrent.*
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 -import org.apache.cassandra.auth.IAuthenticator;
 +import com.datastax.driver.core.Host;
 +import com.datastax.driver.core.Metadata;
 +import com.datastax.driver.core.ResultSet;
 +import com.datastax.driver.core.Row;
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.TokenRange;
++
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.OrderPreservingPartitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 -import org.apache.cassandra.thrift.AuthenticationRequest;
 -import org.apache.cassandra.thrift.Cassandra;
 -import org.apache.cassandra.thrift.CfSplit;
 -import org.apache.cassandra.thrift.InvalidRequestException;
 +import org.apache.cassandra.hadoop.cql3.*;
  import org.apache.cassandra.thrift.KeyRange;
 -import org.apache.cassandra.thrift.TokenRange;
 -import org.apache.commons.lang3.StringUtils;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.mapred.JobConf;
 -import org.apache.hadoop.mapreduce.InputFormat;
 -import org.apache.hadoop.mapreduce.InputSplit;
 -import org.apache.hadoop.mapreduce.JobContext;
 -import org.apache.hadoop.mapreduce.TaskAttemptContext;
 -import org.apache.hadoop.mapreduce.TaskAttemptID;
 -import org.apache.thrift.TApplicationException;
 -import org.apache.thrift.TException;
 -import org.apache.thrift.protocol.TBinaryProtocol;
 -import org.apache.thrift.protocol.TProtocol;
 -import org.apache.thrift.transport.TTransport;
 -import org.apache.thrift.transport.TTransportException;
 -
 +import org.apache.hadoop.mapreduce.*;
  
  public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
  {
@@@ -226,64 -271,83 +227,65 @@@
          }
      }
  
 -    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
 +    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
      {
 -        int splitsize = ConfigHelper.getInputSplitSize(conf);
 -        for (int i = 0; i < range.rpc_endpoints.size(); i++)
 +        int splitSize = ConfigHelper.getInputSplitSize(conf);
 +        try
          {
 -            String host = range.rpc_endpoints.get(i);
 -
 -            if (host == null || host.equals("0.0.0.0"))
 -                host = range.endpoints.get(i);
 -
 -            try
 -            {
 -                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
 -                client.set_keyspace(keyspace);
 -
 -                try
 -                {
 -                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
 -                }
 -                catch (TApplicationException e)
 -                {
 -                    // fallback to guessing split size if talking to a server without describe_splits_ex method
 -                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
 -                    {
 -                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
 -                        return tokenListToSplits(splitPoints, splitsize);
 -                    }
 -                    throw e;
 -                }
 -            }
 -            catch (IOException e)
 -            {
 -                logger.debug("failed connect to endpoint {}", host, e);
 -            }
 -            catch (InvalidRequestException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 -            catch (TException e)
 -            {
 -                throw new RuntimeException(e);
 -            }
 +            return describeSplits(keyspace, cfName, range, splitSize);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException(e);
          }
 -        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
      }
  
 -    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
 +    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
      {
 -        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
 -        for (int j = 0; j < splitTokens.size() - 1; j++)
 -            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
 -        return splits;
 +        try (Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect())
 +        {
 +            Map<TokenRange, Set<Host>> map = new HashMap<>();
 +            Metadata metadata = session.getCluster().getMetadata();
 +            for (TokenRange tokenRange : metadata.getTokenRanges())
 +                map.put(tokenRange, metadata.getReplicas('"' + keyspace + '"', tokenRange));
 +            return map;
 +        }
      }
  
 -    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
 +    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
      {
 -        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
 -
 -        List<TokenRange> map;
 -        try
 +        String query = String.format("SELECT mean_partition_size, partitions_count " +
 +                                     "FROM %s.%s " +
 +                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
 +                                     SystemKeyspace.NAME,
 +                                     SystemKeyspace.SIZE_ESTIMATES);
 +
 +        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
 +
 +        Row row = resultSet.one();
 +        // If we have no data on this split, return the full split i.e., do not sub-split
 +        // Assume smallest granularity of partition count available from CASSANDRA-7688
 +        if (row == null)
          {
 -            map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
 +            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
 +            wrappedTokenRange.put(tokenRange, (long) 128);
 +            return wrappedTokenRange;
          }
 -        catch (InvalidRequestException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        catch (TException e)
 -        {
 -            throw new RuntimeException(e);
 -        }
 -        return map;
 +
 +        long meanPartitionSize = row.getLong("mean_partition_size");
 +        long partitionCount = row.getLong("partitions_count");
 +
 +        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
++        if (splitCount <= 0) splitCount = 1;
 +        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
 +        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
 +        for (TokenRange range : splitRanges)
 +            rangesWithLength.put(range, partitionCount/splitCount);
 +
 +        return rangesWithLength;
      }
  
 -    //
      // Old Hadoop API
 -    //
      public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
      {
          TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index e77c4c8,ced8aa9..9e6e23b
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@@ -23,16 -22,15 +23,17 @@@ import java.io.IOException
  import java.net.InetAddress;
  import java.net.UnknownHostException;
  import java.nio.ByteBuffer;
 -import java.util.HashSet;
 -import java.util.List;
 -import java.util.Set;
 -import java.util.UUID;
 +import java.util.*;
 +import java.util.concurrent.*;
  
 -import org.apache.cassandra.config.EncryptionOptions;
 -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
- 
 +import org.apache.cassandra.config.CFMetaData;
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
  import org.apache.cassandra.hadoop.BulkRecordWriter;
  import org.apache.cassandra.hadoop.ConfigHelper;
  import org.apache.cassandra.hadoop.HadoopCompat;
@@@ -84,6 -67,6 +85,7 @@@ public class CqlBulkRecordWriter extend
      private String insertStatement;
      private File outputDir;
      private boolean deleteSrc;
++    private IPartitioner partitioner;
  
      CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
      {
@@@ -113,55 -90,45 +115,64 @@@
      {
          // if anything is missing, exceptions will be thrown here, instead of on write()
          keyspace = ConfigHelper.getOutputKeyspace(conf);
 -        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
 -        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
 -        insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
 -        outputDir = getColumnFamilyDirectory();
 +        table = ConfigHelper.getOutputColumnFamily(conf);
 +        
 +        // check if table is aliased
 +        String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
 +        if (aliasedCf != null)
 +            table = aliasedCf;
 +        
 +        schema = CqlBulkOutputFormat.getTableSchema(conf, table);
 +        insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
 +        outputDir = getTableDirectory();
          deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
++        try
++        {
++            partitioner = ConfigHelper.getInputPartitioner(conf);
++        }
++        catch (Exception e)
++        {
++            partitioner = Murmur3Partitioner.instance;
++        }
 +    }
 +
 +    protected String getOutputLocation() throws IOException
 +    {
 +        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
 +        if (dir == null)
 +            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
 +        return dir;
      }
  
 -    
      private void prepareWriter() throws IOException
      {
 -        try
 +        if (writer == null)
          {
 -            if (writer == null)
 -            {
 -                writer = CQLSSTableWriter.builder()
 -                    .forTable(schema)
 -                    .using(insertStatement)
 -                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 -                    .inDirectory(outputDir)
 -                    .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
 -                    .build();
 -            }
 -            if (loader == null)
 -            {
 -                BulkLoader.ExternalClient externalClient = getExternalClient(conf);
 -                this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
 -                    @Override
 -                    public void onSuccess(StreamState finalState)
 -                    {
 -                        if (deleteSrc)
 -                            FileUtils.deleteRecursive(outputDir);
 -                    }
 -                };
 -            }
 +            writer = CQLSSTableWriter.builder()
 +                                     .forTable(schema)
 +                                     .using(insertStatement)
 +                                     .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
 +                                     .inDirectory(outputDir)
 +                                     .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
++                                     .withPartitioner(partitioner)
 +                                     .build();
          }
 -        catch (Exception e)
 +
 +        if (loader == null)
          {
 -            throw new IOException(e);
 -        }      
 +            ExternalClient externalClient = new ExternalClient(conf);
 +            externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
 +
 +            loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
 +            {
 +                @Override
 +                public void onSuccess(StreamState finalState)
 +                {
 +                    if (deleteSrc)
 +                        FileUtils.deleteRecursive(outputDir);
 +                }
 +            };
 +        }
      }
      
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index dc3c174,5287bf5..223a848
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@@ -17,71 -17,47 +17,75 @@@
   */
  package org.apache.cassandra.hadoop.pig;
  
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
  import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UnsupportedEncodingException;
 +import java.net.URLDecoder;
  import java.nio.ByteBuffer;
 -import java.nio.charset.CharacterCodingException;
  import java.util.*;
  
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
 -import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions.InternodeEncryption;
 -import org.apache.cassandra.db.BufferCell;
 -import org.apache.cassandra.db.Cell;
 -import org.apache.cassandra.db.composites.CellNames;
 +import com.datastax.driver.core.ColumnMetadata;
 +import com.datastax.driver.core.Metadata;
 +import com.datastax.driver.core.Row;
 +import com.datastax.driver.core.Session;
 +import com.datastax.driver.core.TableMetadata;
 +import com.datastax.driver.core.exceptions.NoHostAvailableException;
++
  import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.exceptions.AuthenticationException;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 +import org.apache.cassandra.exceptions.SyntaxException;
  import org.apache.cassandra.hadoop.ConfigHelper;
  import org.apache.cassandra.hadoop.HadoopCompat;
+ import org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat;
++import org.apache.cassandra.hadoop.cql3.CqlBulkRecordWriter;
  import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
  import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
 -import org.apache.cassandra.thrift.*;
 +import org.apache.cassandra.serializers.CollectionSerializer;
  import org.apache.cassandra.utils.*;
 -import org.apache.commons.lang.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.mapreduce.*;
 -import org.apache.pig.Expression;
 -import org.apache.pig.ResourceSchema;
 +import org.apache.pig.*;
  import org.apache.pig.Expression.OpType;
  import org.apache.pig.ResourceSchema.ResourceFieldSchema;
  import org.apache.pig.backend.executionengine.ExecException;
  import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
  import org.apache.pig.data.*;
  import org.apache.pig.impl.util.UDFContext;
 -import org.apache.thrift.TException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
 +import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
  
 -import com.datastax.driver.core.Row;
  
 -public class CqlNativeStorage extends AbstractCassandraStorage
 +public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
  {
 +    protected String DEFAULT_INPUT_FORMAT;
 +    protected String DEFAULT_OUTPUT_FORMAT;
 +
 +    protected String username;
 +    protected String password;
 +    protected String keyspace;
 +    protected String column_family;
 +    protected String loadSignature;
 +    protected String storeSignature;
 +
 +    protected Configuration conf;
 +    protected String inputFormatClass;
 +    protected String outputFormatClass;
 +    protected int splitSize = 64 * 1024;
 +    protected String partitionerClass;
 +    protected boolean usePartitionFilter = false;
 +    protected String initHostAddress;
 +    protected String rpcPort;
 +    protected int nativeProtocolVersion = 1;
 +
      private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
 -    public static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
++    private static String BULK_OUTPUT_FORMAT = "org.apache.cassandra.hadoop.cql3.CqlBulkOutputFormat";
      private int pageSize = 1000;
      private String columns;
      private String outputQuery;
@@@ -109,6 -87,22 +113,16 @@@
      private String nativeSSLCipherSuites;
      private String inputCql;
  
+     private boolean bulkOutputFormat = false;
+     private String bulkCfSchema;
+     private String bulkInsertStatement;
+     private String bulkOutputLocation;
+     private int bulkBuffSize = -1;
+     private int bulkStreamThrottle = -1;
+     private int bulkMaxFailedHosts = -1;
 -    private int storagePort = CqlBulkOutputFormat.DEFAULT_STORAGE_PORT;
 -    private int sslStoragePort = CqlBulkOutputFormat.DEFAULT_SSL_STORAGE_PORT;
 -    private String serverKeystore;
 -    private String serverKeystorePassword;
 -    private String serverTruststore;
 -    private String serverTruststorePassword;
 -    private String serverCipherSuites;
 -    private String internodeEncrypt;
++    private boolean bulkDeleteSourceOnSuccess = true;
++    private String bulkTableAlias;
+ 
      public CqlNativeStorage()
      {
          this(1000);
@@@ -241,83 -240,188 +255,48 @@@
          return obj;
      }
  
 -    /** include key columns */
 -    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
 -            throws InvalidRequestException,
 -            UnavailableException,
 -            TimedOutException,
 -            SchemaDisagreementException,
 -            TException,
 -            CharacterCodingException,
 -            org.apache.cassandra.exceptions.InvalidRequestException,
 -            ConfigurationException,
 -            NotFoundException
 -    {
 -        List<ColumnDef> keyColumns = null;
 -        // get key columns
 +    /** get the columnfamily definition for the signature */
 +    protected TableInfo getCfInfo(String signature) throws IOException
 +    {
 +        UDFContext context = UDFContext.getUDFContext();
 +        Properties property = context.getUDFProperties(CqlNativeStorage.class);
 +        TableInfo cfInfo;
          try
          {
 -            keyColumns = getKeysMeta(client);
 +            cfInfo = cfdefFromString(property.getProperty(signature));
          }
 -        catch(Exception e)
 +        catch (ClassNotFoundException e)
          {
 -            logger.error("Error in retrieving key columns" , e);
 +            throw new IOException(e);
          }
 -
 -        // get other columns
 -        List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
 -
 -        // combine all columns in a list
 -        if (keyColumns != null && columns != null)
 -            keyColumns.addAll(columns);
 -
 -        return keyColumns;
 +        return cfInfo;
      }
  
 -    /** get keys meta data */
 -    private List<ColumnDef> getKeysMeta(Cassandra.Client client)
 -            throws Exception
 +    /** return the CfInfo for the column family */
 +    protected TableMetadata getCfInfo(Session client)
 +            throws NoHostAvailableException,
 +            AuthenticationException,
 +            IllegalStateException
      {
 -        String query = "SELECT key_aliases, " +
 -                "       column_aliases, " +
 -                "       key_validator, " +
 -                "       comparator, " +
 -                "       keyspace_name, " +
 -                "       value_alias, " +
 -                "       default_validator " +
 -                "FROM system.schema_columnfamilies " +
 -                "WHERE keyspace_name = '%s'" +
 -                "  AND columnfamily_name = '%s' ";
 -
 -        CqlResult result = client.execute_cql3_query(
 -                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
 -                Compression.NONE,
 -                ConsistencyLevel.ONE);
 -
 -        if (result == null || result.rows == null || result.rows.isEmpty())
 -            return null;
 -
 -        Iterator<CqlRow> iteraRow = result.rows.iterator();
 -        List<ColumnDef> keys = new ArrayList<ColumnDef>();
 -        if (iteraRow.hasNext())
 -        {
 -            CqlRow cqlRow = iteraRow.next();
 -            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
 -            logger.debug("Found ksDef name: {}", name);
 -            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
 -
 -            logger.debug("partition keys: {}", keyString);
 -            List<String> keyNames = FBUtilities.fromJsonList(keyString);
 -
 -            Iterator<String> iterator = keyNames.iterator();
 -            while (iterator.hasNext())
 -            {
 -                ColumnDef cDef = new ColumnDef();
 -                cDef.name = ByteBufferUtil.bytes(iterator.next());
 -                keys.add(cDef);
 -            }
 -            // classic thrift tables
 -            if (keys.size() == 0)
 -            {
 -                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
 -                for (ColumnDefinition def : cfm.partitionKeyColumns())
 -                {
 -                    String key = def.name.toString();
 -                    logger.debug("name: {} ", key);
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = ByteBufferUtil.bytes(key);
 -                    keys.add(cDef);
 -                }
 -                for (ColumnDefinition def : cfm.clusteringColumns())
 -                {
 -                    String key = def.name.toString();
 -                    logger.debug("name: {} ", key);
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = ByteBufferUtil.bytes(key);
 -                    keys.add(cDef);
 -                }
 -            }
 -
 -            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
 -
 -            logger.debug("cluster keys: {}", keyString);
 -            keyNames = FBUtilities.fromJsonList(keyString);
 -
 -            iterator = keyNames.iterator();
 -            while (iterator.hasNext())
 -            {
 -                ColumnDef cDef = new ColumnDef();
 -                cDef.name = ByteBufferUtil.bytes(iterator.next());
 -                keys.add(cDef);
 -            }
 -
 -            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
 -            logger.debug("row key validator: {}", validator);
 -            AbstractType<?> keyValidator = parseType(validator);
 -
 -            Iterator<ColumnDef> keyItera = keys.iterator();
 -            if (keyValidator instanceof CompositeType)
 -            {
 -                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
 -                while (typeItera.hasNext())
 -                    keyItera.next().validation_class = typeItera.next().toString();
 -            }
 -            else
 -                keyItera.next().validation_class = keyValidator.toString();
 -
 -            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
 -            logger.debug("cluster key validator: {}", validator);
 -
 -            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
 -            {
 -                AbstractType<?> clusterKeyValidator = parseType(validator);
 -
 -                if (clusterKeyValidator instanceof CompositeType)
 -                {
 -                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
 -                    while (keyItera.hasNext())
 -                        keyItera.next().validation_class = typeItera.next().toString();
 -                }
 -                else
 -                    keyItera.next().validation_class = clusterKeyValidator.toString();
 -            }
 -
 -            // compact value_alias column
 -            if (cqlRow.columns.get(5).value != null)
 -            {
 -                try
 -                {
 -                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
 -                    logger.debug("default validator: {}", compactValidator);
 -                    AbstractType<?> defaultValidator = parseType(compactValidator);
 -
 -                    ColumnDef cDef = new ColumnDef();
 -                    cDef.name = cqlRow.columns.get(5).value;
 -                    cDef.validation_class = defaultValidator.toString();
 -                    keys.add(cDef);
 -                    hasCompactValueAlias = true;
 -                }
 -                catch (Exception e)
 -                {
 -                    // no compact column at value_alias
 -                }
 -            }
 -
 -        }
 -        return keys;
 +        // get CF meta data
 +        return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
      }
  
-     /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
-     public void putNext(Tuple t) throws IOException
-     {
-         if (t.size() < 1)
-         {
-             // simply nothing here, we can't even delete without a key
-             logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-             return;
-         }
- 
-         if (t.getType(0) == DataType.TUPLE)
-         {
-             if (t.getType(1) == DataType.TUPLE)
-             {
-                 Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
-                 cqlQueryFromTuple(key, t, 1);
-             }
-             else
-                 throw new IOException("Second argument in output must be a tuple");
-         }
-         else
-             throw new IOException("First argument in output must be a tuple");
-     }
- 
      /** convert key tuple to key map */
      private Map<String, ByteBuffer> tupleToKeyMap(Tuple t) throws IOException
      {
          Map<String, ByteBuffer> keys = new HashMap<String, ByteBuffer>();
          for (int i = 0; i < t.size(); i++)
          {
-             if (t.getType(i) == DataType.TUPLE)
-             {
-                 Tuple inner = (Tuple) t.get(i);
-                 if (inner.size() == 2)
-                 {
-                     Object name = inner.get(0);
-                     if (name != null)
-                     {
-                         keys.put(name.toString(), objToBB(inner.get(1)));
-                     }
-                     else
-                         throw new IOException("Key name was empty");
-                 }
-                 else
-                     throw new IOException("Keys were not in name and value pairs");
-             }
-             else
-             {
+             if (t.getType(i) != DataType.TUPLE)
                  throw new IOException("keys was not a tuple");
-             }
 -
+             Tuple inner = (Tuple) t.get(i);
+             if (inner.size() != 2)
+                 throw new IOException("Keys were not in name and value pairs");
+             Object name = inner.get(0);
+             if (name == null)
+                 throw new IOException("Key name was empty");
+             keys.put(name.toString(), objToBB(inner.get(1)));
          }
          return keys;
      }
@@@ -543,10 -537,41 +517,41 @@@
      private String getWhereClauseForPartitionFilter()
      {
          UDFContext context = UDFContext.getUDFContext();
 -        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
 -        return property.getProperty(PARTITION_FILTER_SIGNATURE);
 +        Properties property = context.getUDFProperties(CqlNativeStorage.class);
 +        return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
      }
  
+     /**
+      *  output: (((name, value), (name, value)), (value ... value), (value...value))
+      *  bulk output: ((value ... value), (value...value))
+      *
+      * */
+     public void putNext(Tuple t) throws IOException
+     {
+         if (t.size() < 1)
+         {
+             // simply nothing here, we can't even delete without a key
+             logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+             return;
+         }
+ 
+         if (t.getType(0) != DataType.TUPLE)
+             throw new IOException("First argument in output must be a tuple");
+ 
+         if (!bulkOutputFormat && t.getType(1) != DataType.TUPLE)
+             throw new IOException("Second argument in output must be a tuple");
+ 
+         if (bulkOutputFormat)
+         {
+             cqlQueryFromTuple(null, t, 0);
+         }
+         else
+         {
+             Map<String, ByteBuffer> key = tupleToKeyMap((Tuple)t.get(0));
+             cqlQueryFromTuple(key, t, 1);
+         }
+     }
+ 
      /** set read configuration settings */
      public void setLocation(String location, Job job) throws IOException
      {
@@@ -672,6 -699,42 +677,32 @@@
          ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
          CqlConfigHelper.setOutputCql(conf, outputQuery);
  
+         if (bulkOutputFormat)
+         {
+             DEFAULT_OUTPUT_FORMAT = BULK_OUTPUT_FORMAT;
+             if (bulkCfSchema != null)
 -                CqlBulkOutputFormat.setColumnFamilySchema(conf, column_family, bulkCfSchema);
++                CqlBulkOutputFormat.setTableSchema(conf, column_family, bulkCfSchema);
+             else
+                 throw new IOException("bulk_cf_schema is missing in input url parameter");
+             if (bulkInsertStatement != null)
 -                CqlBulkOutputFormat.setColumnFamilyInsertStatement(conf, column_family, bulkInsertStatement);
++                CqlBulkOutputFormat.setTableInsertStatement(conf, column_family, bulkInsertStatement);
+             else
+                 throw new IOException("bulk_insert_statement is missing in input url parameter");
++            if (bulkTableAlias != null)
++                CqlBulkOutputFormat.setTableAlias(conf, bulkTableAlias, column_family); 
++            CqlBulkOutputFormat.setDeleteSourceOnSuccess(conf, bulkDeleteSourceOnSuccess);
+             if (bulkOutputLocation != null)
 -                conf.set(AbstractBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
++                conf.set(CqlBulkRecordWriter.OUTPUT_LOCATION, bulkOutputLocation);
+             if (bulkBuffSize > 0)
 -                conf.set(AbstractBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
++                conf.set(CqlBulkRecordWriter.BUFFER_SIZE_IN_MB, String.valueOf(bulkBuffSize));
+             if (bulkStreamThrottle > 0)
 -                conf.set(AbstractBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
++                conf.set(CqlBulkRecordWriter.STREAM_THROTTLE_MBITS, String.valueOf(bulkStreamThrottle));
+             if (bulkMaxFailedHosts > 0)
 -                conf.set(AbstractBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
 -            CqlBulkOutputFormat.setSSLStoragePort(conf, sslStoragePort);
 -            CqlBulkOutputFormat.setStoragePort(conf, storagePort);
 -            if (serverEncrypted())
 -            {
 -                if (!StringUtils.isEmpty(serverKeystore))
 -                    CqlBulkOutputFormat.setServerKeystore(conf, serverKeystore);
 -                if (!StringUtils.isEmpty(serverTruststore))
 -                    CqlBulkOutputFormat.setServerTruststore(conf, serverTruststore);
 -                if (!StringUtils.isEmpty(serverKeystorePassword))
 -                    CqlBulkOutputFormat.setServerKeystorePassword(conf, serverKeystorePassword);
 -                if (!StringUtils.isEmpty(serverTruststorePassword))
 -                    CqlBulkOutputFormat.setServerTruststorePassword(conf, serverTruststorePassword);
 -                if (!StringUtils.isEmpty(serverCipherSuites))
 -                    CqlBulkOutputFormat.setServerCipherSuites(conf, serverCipherSuites);
 -            }
++                conf.set(CqlBulkRecordWriter.MAX_FAILED_HOSTS, String.valueOf(bulkMaxFailedHosts));
++            if (partitionerClass!= null)
++                ConfigHelper.setInputPartitioner(conf, partitionerClass);
+         }
+ 
          setConnectionInformation();
  
          if (ConfigHelper.getOutputRpcPort(conf) == 0)
@@@ -773,6 -773,37 +804,25 @@@
                  if (urlQuery.containsKey("output_query"))
                      outputQuery = urlQuery.get("output_query");
  
+                 if (urlQuery.containsKey("bulk_output_format"))
+                     bulkOutputFormat = Boolean.valueOf(urlQuery.get("bulk_output_format"));
+                 if (urlQuery.containsKey("bulk_cf_schema"))
+                     bulkCfSchema = urlQuery.get("bulk_cf_schema");
+                 if (urlQuery.containsKey("bulk_insert_statement"))
+                     bulkInsertStatement = urlQuery.get("bulk_insert_statement");
+                 if (urlQuery.containsKey("bulk_output_location"))
+                     bulkOutputLocation = urlQuery.get("bulk_output_location");
+                 if (urlQuery.containsKey("bulk_buff_size"))
+                     bulkBuffSize = Integer.valueOf(urlQuery.get("bulk_buff_size"));
+                 if (urlQuery.containsKey("bulk_stream_throttle"))
+                     bulkStreamThrottle = Integer.valueOf(urlQuery.get("bulk_stream_throttle"));
+                 if (urlQuery.containsKey("bulk_max_failed_hosts"))
+                     bulkMaxFailedHosts = Integer.valueOf(urlQuery.get("bulk_max_failed_hosts"));
 -                if (urlQuery.containsKey("storage_port"))
 -                    storagePort = Integer.valueOf(urlQuery.get("storage_port"));
 -                if (urlQuery.containsKey("ssl_storage_port"))
 -                    sslStoragePort = Integer.valueOf(urlQuery.get("ssl_storage_port"));
 -                if (urlQuery.containsKey("internode_encrypt"))
 -                    internodeEncrypt = urlQuery.get("internode_encrypt");
 -                if (urlQuery.containsKey("server_keystore"))
 -                    serverKeystore = urlQuery.get("server_keystore");
 -                if (urlQuery.containsKey("server_truststore"))
 -                    serverTruststore = urlQuery.get("server_truststore");
 -                if (urlQuery.containsKey("server_keystore_pass"))
 -                    serverKeystorePassword = urlQuery.get("server_keystore_pass");
 -                if (urlQuery.containsKey("server_truststore_pass"))
 -                    serverTruststorePassword = urlQuery.get("server_truststore_pass");
 -                if (urlQuery.containsKey("server_cipher_suites"))
 -                    serverCipherSuites = urlQuery.get("server_cipher_suites");
++                if (urlQuery.containsKey("bulk_delete_source"))
++                    bulkDeleteSourceOnSuccess = Boolean.parseBoolean(urlQuery.get("bulk_delete_source"));
++                if (urlQuery.containsKey("bulk_table_alias"))
++                    bulkTableAlias = urlQuery.get("bulk_table_alias");
+ 
                  //split size
                  if (urlQuery.containsKey("split_size"))
                      splitSize = Integer.parseInt(urlQuery.get("split_size"));
@@@ -855,10 -888,20 +905,13 @@@
                      "[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]" +
                      "[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]" +
                      "[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]" +
-                     "[columns=<columns>][where_clause=<where_clause>]]': " + e.getMessage());
-         }
+                     "[columns=<columns>][where_clause=<where_clause>]" +
 -                    "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement]" +
 -                    "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>]" +
 -                    "[&storage_port=<storage_port>][&ssl_storage_port=<ssl_storage_port>]" +
 -                    "[&server_keystore=<server_keystore>][&server_keystore_pass=<server_keystore_pass>]" +
 -                    "[&server_truststore=<server_truststore>][&server_truststore_pass=<server_truststore_pass>]" +
 -                    "[&server_cipher_suites=<server_cipher_suites>][&internode_encrypt=<internode_encrypt>]" +
++                    "[&bulk_cf_schema=bulk_cf_schema][&bulk_insert_statement=bulk_insert_statement][&bulk_table_alias=<bulk_table_alias>]" +
++                    "[&bulk_output_location=<bulk_output_location>][&bulk_buff_size=<bulk_buff_size>][&bulk_delete_source=<bulk_delete_source>]" +
+                     "[&bulk_stream_throttle=<bulk_stream_throttle>][&bulk_max_failed_hosts=<bulk_max_failed_hosts>]]': " +  e.getMessage());
+          }
      }
  
 -    /**
 -     * Thrift API can't handle null, so use empty byte array
 -     */
      public ByteBuffer nullToBB()
      {
          return ByteBuffer.wrap(new byte[0]);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/conf/cassandra_pig.yaml
----------------------------------------------------------------------
diff --cc test/conf/cassandra_pig.yaml
index 0000000,0000000..68615cf
new file mode 100644
--- /dev/null
+++ b/test/conf/cassandra_pig.yaml
@@@ -1,0 -1,0 +1,41 @@@
++#
++# Warning!
++# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
++#
++cluster_name: Test Cluster
++memtable_allocation_type: offheap_objects
++commitlog_sync: batch
++commitlog_sync_batch_window_in_ms: 1.0
++commitlog_segment_size_in_mb: 5
++commitlog_directory: build/test/cassandra/commitlog
++partitioner: org.apache.cassandra.dht.Murmur3Partitioner
++listen_address: 127.0.0.1
++storage_port: 7010
++rpc_port: 9170
++start_native_transport: true
++native_transport_port: 9042
++column_index_size_in_kb: 4
++saved_caches_directory: build/test/cassandra/saved_caches
++data_file_directories:
++    - build/test/cassandra/data
++disk_access_mode: mmap
++seed_provider:
++    - class_name: org.apache.cassandra.locator.SimpleSeedProvider
++      parameters:
++          - seeds: "127.0.0.1"
++endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
++dynamic_snitch: true
++request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
++request_scheduler_id: keyspace
++server_encryption_options:
++    internode_encryption: none
++    keystore: conf/.keystore
++    keystore_password: cassandra
++    truststore: conf/.truststore
++    truststore_password: cassandra
++incremental_backups: true
++concurrent_compactors: 4
++compaction_throughput_mb_per_sec: 0
++row_cache_class_name: org.apache.cassandra.cache.OHCProvider
++row_cache_size_in_mb: 16
++enable_user_defined_functions: true

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 72fdd5a,2e1758e..3902fce
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@@ -265,4 -317,33 +272,32 @@@ public class CqlTableTest extends PigTe
              Assert.fail("Can't fetch any data");
          }
      }
+ 
+     @Test
 -    public void testCqlStorageSingleKeyTableBulkLoad()
 -    throws AuthenticationException, AuthorizationException, InvalidRequestException, UnavailableException, TimedOutException, TException, NotFoundException, SchemaDisagreementException, IOException
++    public void testCqlStorageSingleKeyTableBulkLoad() throws TException, IOException
+     {
+         pig.setBatchOn();
+         //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+         pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+         pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE TOTUPLE(x, y);");
+         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters +  "&bulk_output_format=true&bulk_cf_schema=CREATE%20TABLE%20cql3ks.test_bulk%20(a%20int%20PRIMARY%20KEY%2C%20b%20int)&bulk_insert_statement=Insert%20into%20cql3ks.test_bulk(a%2C%20b)%20values(%3F%2C%3F)' USING CqlNativeStorage();");
+         pig.executeBatch();
+ 
+         //(5,5)
+         //(6,6)
+         //(4,4)
+         //(2,2)
+         //(3,3)
+         //(1,1)
+         //input_cql=select * from test_bulk1 where token(a) > ? and token(a) <= ?
+         pig.registerQuery("result= LOAD 'cql://cql3ks/test_bulk?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20test_bulk%20where%20token(a)%20%3E%20%3F%20and%20token(a)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+         Iterator<Tuple> it = pig.openIterator("result");
+         int count = 0;
+         while (it.hasNext()) {
+             Tuple t = it.next();
+             Assert.assertEquals(t.get(0), t.get(1));
+             count ++;
+         }
+         Assert.assertEquals(6, count);
+      }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cd4a1e6a/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --cc test/pig/org/apache/cassandra/pig/PigTestBase.java
index 8c27f6c,e6964f8..a8a9de5
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@@ -54,7 -65,8 +54,7 @@@ public class PigTestBase extends Schema
      protected static Configuration conf;
      protected static MiniCluster cluster; 
      protected static PigServer pig;
-     protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
 -    protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner" +
 -                                               "&storage_port=7010&ssl_storage_port=7011&internode_encrypt=NONE";
++    protected static String defaultParameters= "init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.Murmur3Partitioner";
      protected static String nativeParameters = "&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"  +
                                                 "&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3" +
                                                 "&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9042";
@@@ -62,6 -74,6 +62,7 @@@
      static
      {
          System.setProperty("logback.configurationFile", "logback-test.xml");
++        System.setProperty("cassandra.config", "cassandra_pig.yaml");
      }
  
      @AfterClass