You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:56:58 UTC

[1/4] cassandra git commit: Remove Thrift dependencies in bundled tools

Repository: cassandra
Updated Branches:
  refs/heads/trunk 5b6154531 -> f698cc228


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/BulkLoader.java b/src/java/org/apache/cassandra/tools/BulkLoader.java
index 22921e2..51e5e3d 100644
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@ -18,33 +18,27 @@
 package org.apache.cassandra.tools;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.UnknownHostException;
 import java.util.*;
 
-import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.commons.cli.*;
 
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.SSLOptions;
+import javax.net.ssl.SSLContext;
 import org.apache.cassandra.config.*;
-import org.apache.cassandra.db.SystemKeyspace;
-import org.apache.cassandra.db.marshal.UTF8Type;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.security.SSLFactory;
 import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
 import org.apache.cassandra.utils.OutputHandler;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
 
 public class BulkLoader
 {
@@ -54,7 +48,7 @@ public class BulkLoader
     private static final String NOPROGRESS_OPTION  = "no-progress";
     private static final String IGNORE_NODES_OPTION  = "ignore";
     private static final String INITIAL_HOST_ADDRESS_OPTION = "nodes";
-    private static final String RPC_PORT_OPTION = "port";
+    private static final String NATIVE_PORT_OPTION = "port";
     private static final String USER_OPTION = "username";
     private static final String PASSWD_OPTION = "password";
     private static final String THROTTLE_MBITS = "throttle";
@@ -82,13 +76,13 @@ public class BulkLoader
                 options.directory,
                 new ExternalClient(
                         options.hosts,
-                        options.rpcPort,
+                        options.nativePort,
                         options.user,
                         options.passwd,
-                        options.transportFactory,
                         options.storagePort,
                         options.sslStoragePort,
-                        options.serverEncOptions),
+                        options.serverEncOptions,
+                        buildSSLOptions((EncryptionOptions.ClientEncryptionOptions)options.encOptions)),
                 handler,
                 options.connectionsPerHost);
         DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(options.throttle);
@@ -154,8 +148,13 @@ public class BulkLoader
             start = lastTime = System.nanoTime();
         }
 
-        public void onSuccess(StreamState finalState) {}
-        public void onFailure(Throwable t) {}
+        public void onSuccess(StreamState finalState)
+        {
+        }
+
+        public void onFailure(Throwable t)
+        {
+        }
 
         public synchronized void handleStreamEvent(StreamEvent event)
         {
@@ -254,14 +253,27 @@ public class BulkLoader
         }
     }
 
-    static class ExternalClient extends SSTableLoader.Client
+    private static SSLOptions buildSSLOptions(EncryptionOptions.ClientEncryptionOptions clientEncryptionOptions)
+    {
+
+        if (!clientEncryptionOptions.enabled)
+            return null;
+
+        SSLContext sslContext;
+        try
+        {
+            sslContext = SSLFactory.createSSLContext(clientEncryptionOptions, true);
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Could not create SSL Context.", e);
+        }
+
+        return new SSLOptions(sslContext, clientEncryptionOptions.cipher_suites);
+    }
+
+    static class ExternalClient extends NativeSSTableLoaderClient
     {
-        private final Map<String, CFMetaData> knownCfs = new HashMap<>();
-        private final Set<InetAddress> hosts;
-        private final int rpcPort;
-        private final String user;
-        private final String passwd;
-        private final ITransportFactory transportFactory;
         private final int storagePort;
         private final int sslStoragePort;
         private final EncryptionOptions.ServerEncryptionOptions serverEncOptions;
@@ -270,103 +282,22 @@ public class BulkLoader
                               int port,
                               String user,
                               String passwd,
-                              ITransportFactory transportFactory,
                               int storagePort,
                               int sslStoragePort,
-                              EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions)
+                              EncryptionOptions.ServerEncryptionOptions serverEncryptionOptions,
+                              SSLOptions sslOptions)
         {
-            super();
-            this.hosts = hosts;
-            this.rpcPort = port;
-            this.user = user;
-            this.passwd = passwd;
-            this.transportFactory = transportFactory;
+            super(hosts, port, user, passwd, sslOptions);
             this.storagePort = storagePort;
             this.sslStoragePort = sslStoragePort;
             this.serverEncOptions = serverEncryptionOptions;
         }
 
         @Override
-        public void init(String keyspace)
-        {
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    // Query endpoint to ranges map and schemas from thrift
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort, this.user, this.passwd, this.transportFactory);
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : client.describe_ring(keyspace))
-                    {
-                        Range<Token> range = new Range<>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    String cfQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s'",
-                                                   SystemKeyspace.NAME,
-                                                   LegacySchemaTables.COLUMNFAMILIES,
-                                                   keyspace);
-                    CqlResult cfRes = client.execute_cql3_query(ByteBufferUtil.bytes(cfQuery), Compression.NONE, ConsistencyLevel.ONE);
-
-
-                    for (CqlRow row : cfRes.rows)
-                    {
-                        String columnFamily = UTF8Type.instance.getString(row.columns.get(1).bufferForName());
-                        String columnsQuery = String.format("SELECT * FROM %s.%s WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                                                            SystemKeyspace.NAME,
-                                                            LegacySchemaTables.COLUMNS,
-                                                            keyspace,
-                                                            columnFamily);
-                        CqlResult columnsRes = client.execute_cql3_query(ByteBufferUtil.bytes(columnsQuery), Compression.NONE, ConsistencyLevel.ONE);
-
-                        CFMetaData metadata = ThriftConversion.fromThriftCqlRow(row, columnsRes);
-                        knownCfs.put(metadata.cfName, metadata);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
-                }
-            }
-        }
-
-        @Override
         public StreamConnectionFactory getConnectionFactory()
         {
             return new BulkLoadConnectionFactory(storagePort, sslStoragePort, serverEncOptions, false);
         }
-
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            return knownCfs.get(cfName);
-        }
-
-        private static Cassandra.Client createThriftClient(String host, int port, String user, String passwd, ITransportFactory transportFactory) throws Exception
-        {
-            TTransport trans = transportFactory.openTransport(host, port);
-            TProtocol protocol = new TBinaryProtocol(trans);
-            Cassandra.Client client = new Cassandra.Client(protocol);
-            if (user != null && passwd != null)
-            {
-                Map<String, String> credentials = new HashMap<>();
-                credentials.put(PasswordAuthenticator.USERNAME_KEY, user);
-                credentials.put(PasswordAuthenticator.PASSWORD_KEY, passwd);
-                AuthenticationRequest authenticationRequest = new AuthenticationRequest(credentials);
-                client.login(authenticationRequest);
-            }
-            return client;
-        }
     }
 
     static class LoaderOptions
@@ -376,13 +307,12 @@ public class BulkLoader
         public boolean debug;
         public boolean verbose;
         public boolean noProgress;
-        public int rpcPort = 9160;
+        public int nativePort = 9042;
         public String user;
         public String passwd;
         public int throttle = 0;
         public int storagePort;
         public int sslStoragePort;
-        public ITransportFactory transportFactory = new TFramedTransportFactory();
         public EncryptionOptions encOptions = new EncryptionOptions.ClientEncryptionOptions();
         public int connectionsPerHost = 1;
         public EncryptionOptions.ServerEncryptionOptions serverEncOptions = new EncryptionOptions.ServerEncryptionOptions();
@@ -438,8 +368,8 @@ public class BulkLoader
                 opts.verbose = cmd.hasOption(VERBOSE_OPTION);
                 opts.noProgress = cmd.hasOption(NOPROGRESS_OPTION);
 
-                if (cmd.hasOption(RPC_PORT_OPTION))
-                    opts.rpcPort = Integer.parseInt(cmd.getOptionValue(RPC_PORT_OPTION));
+                if (cmd.hasOption(NATIVE_PORT_OPTION))
+                    opts.nativePort = Integer.parseInt(cmd.getOptionValue(NATIVE_PORT_OPTION));
 
                 if (cmd.hasOption(USER_OPTION))
                     opts.user = cmd.getOptionValue(USER_OPTION);
@@ -558,13 +488,6 @@ public class BulkLoader
                     opts.encOptions.cipher_suites = cmd.getOptionValue(SSL_CIPHER_SUITES).split(",");
                 }
 
-                if (cmd.hasOption(TRANSPORT_FACTORY))
-                {
-                    ITransportFactory transportFactory = getTransportFactory(cmd.getOptionValue(TRANSPORT_FACTORY));
-                    configureTransportFactory(transportFactory, opts);
-                    opts.transportFactory = transportFactory;
-                }
-
                 return opts;
             }
             catch (ParseException | ConfigurationException | MalformedURLException e)
@@ -574,50 +497,6 @@ public class BulkLoader
             }
         }
 
-        private static ITransportFactory getTransportFactory(String transportFactory)
-        {
-            try
-            {
-                Class<?> factory = Class.forName(transportFactory);
-                if (!ITransportFactory.class.isAssignableFrom(factory))
-                    throw new IllegalArgumentException(String.format("transport factory '%s' " +
-                            "not derived from ITransportFactory", transportFactory));
-                return (ITransportFactory) factory.newInstance();
-            }
-            catch (Exception e)
-            {
-                throw new IllegalArgumentException(String.format("Cannot create a transport factory '%s'.", transportFactory), e);
-            }
-        }
-
-        private static void configureTransportFactory(ITransportFactory transportFactory, LoaderOptions opts)
-        {
-            Map<String, String> options = new HashMap<>();
-            // If the supplied factory supports the same set of options as our SSL impl, set those 
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE))
-                options.put(SSLTransportFactory.TRUSTSTORE, opts.encOptions.truststore);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.TRUSTSTORE_PASSWORD))
-                options.put(SSLTransportFactory.TRUSTSTORE_PASSWORD, opts.encOptions.truststore_password);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.PROTOCOL))
-                options.put(SSLTransportFactory.PROTOCOL, opts.encOptions.protocol);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.CIPHER_SUITES))
-                options.put(SSLTransportFactory.CIPHER_SUITES, Joiner.on(',').join(opts.encOptions.cipher_suites));
-
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE)
-                    && opts.encOptions.require_client_auth)
-                options.put(SSLTransportFactory.KEYSTORE, opts.encOptions.keystore);
-            if (transportFactory.supportedOptions().contains(SSLTransportFactory.KEYSTORE_PASSWORD)
-                    && opts.encOptions.require_client_auth)
-                options.put(SSLTransportFactory.KEYSTORE_PASSWORD, opts.encOptions.keystore_password);
-
-            // Now check if any of the factory's supported options are set as system properties
-            for (String optionKey : transportFactory.supportedOptions())
-                if (System.getProperty(optionKey) != null)
-                    options.put(optionKey, System.getProperty(optionKey));
-
-            transportFactory.setOptions(options);
-        }
-
         private static void errorMsg(String msg, CmdLineOptions options)
         {
             System.err.println(msg);
@@ -633,7 +512,7 @@ public class BulkLoader
             options.addOption(null, NOPROGRESS_OPTION,   "don't display progress");
             options.addOption("i",  IGNORE_NODES_OPTION, "NODES", "don't stream to this (comma separated) list of nodes");
             options.addOption("d",  INITIAL_HOST_ADDRESS_OPTION, "initial hosts", "Required. try to connect to these hosts (comma separated) initially for ring information");
-            options.addOption("p",  RPC_PORT_OPTION, "rpc port", "port used for rpc (default 9160)");
+            options.addOption("p",  NATIVE_PORT_OPTION, "rpc port", "port used for native connection (default 9042)");
             options.addOption("t",  THROTTLE_MBITS, "throttle", "throttle speed in Mbits (default unlimited)");
             options.addOption("u",  USER_OPTION, "username", "username for cassandra authentication");
             options.addOption("pw", PASSWD_OPTION, "password", "password for cassandra authentication");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
new file mode 100644
index 0000000..1ef686c
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.utils;
+
+import java.net.InetAddress;
+import java.util.*;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.db.composites.CellNameType;
+import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.dht.*;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.schema.LegacySchemaTables;
+
+public class NativeSSTableLoaderClient extends SSTableLoader.Client
+{
+    protected final Map<String, CFMetaData> tables;
+    private final Collection<InetAddress> hosts;
+    private final int port;
+    private final String username;
+    private final String password;
+    private final SSLOptions sslOptions;
+
+    public NativeSSTableLoaderClient(Collection<InetAddress> hosts, int port, String username, String password, SSLOptions sslOptions)
+    {
+        super();
+        this.tables = new HashMap<>();
+        this.hosts = hosts;
+        this.port = port;
+        this.username = username;
+        this.password = password;
+        this.sslOptions = sslOptions;
+    }
+
+    public void init(String keyspace)
+    {
+        Cluster.Builder builder = Cluster.builder().addContactPoints(hosts).withPort(port);
+        if (sslOptions != null)
+            builder.withSSL(sslOptions);
+        if (username != null && password != null)
+            builder = builder.withCredentials(username, password);
+
+        try (Cluster cluster = builder.build())
+        {
+            Session session = cluster.connect();
+            Metadata metadata = cluster.getMetadata();
+
+            setPartitioner(metadata.getPartitioner());
+
+            Set<TokenRange> tokenRanges = metadata.getTokenRanges();
+
+            Token.TokenFactory tokenFactory = getPartitioner().getTokenFactory();
+
+            for (TokenRange tokenRange : tokenRanges)
+            {
+                Set<Host> endpoints = metadata.getReplicas(keyspace, tokenRange);
+                Range<Token> range = new Range<>(tokenFactory.fromString(tokenRange.getStart().getValue().toString()),
+                                                 tokenFactory.fromString(tokenRange.getEnd().getValue().toString()));
+                for (Host endpoint : endpoints)
+                    addRangeForEndpoint(range, endpoint.getAddress());
+            }
+
+            tables.putAll(fetchTablesMetadata(keyspace, session));
+        }
+    }
+
+    public CFMetaData getTableMetadata(String tableName)
+    {
+        return tables.get(tableName);
+    }
+
+    @Override
+    public void setTableMetadata(CFMetaData cfm)
+    {
+        tables.put(cfm.cfName, cfm);
+    }
+
+    private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Session session)
+    {
+        Map<String, CFMetaData> tables = new HashMap<>();
+
+        String query = String.format("SELECT columnfamily_name, cf_id, type, comparator, subcomparator FROM %s.%s WHERE keyspace_name = '%s'",
+                                     SystemKeyspace.NAME,
+                                     LegacySchemaTables.COLUMNFAMILIES,
+                                     keyspace);
+
+        for (Row row : session.execute(query))
+        {
+            String name = row.getString("columnfamily_name");
+            UUID id = row.getUUID("cf_id");
+            ColumnFamilyType type = ColumnFamilyType.valueOf(row.getString("type"));
+            AbstractType rawComparator = TypeParser.parse(row.getString("comparator"));
+            AbstractType subComparator = row.isNull("subcomparator")
+                                       ? null
+                                       : TypeParser.parse(row.getString("subcomparator"));
+            boolean isDense = row.getBool("is_dense");
+            CellNameType comparator = CellNames.fromAbstractType(CFMetaData.makeRawAbstractType(rawComparator, subComparator),
+                                                                 isDense);
+
+            tables.put(name, new CFMetaData(keyspace, name, type, comparator, id));
+        }
+
+        return tables;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 26f9f68..72fdd5a 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.thrift.TException;
 import org.junit.Assert;
@@ -70,6 +69,11 @@ public class CqlTableTest extends PigTestBase
             "UPDATE collectiontable SET n['key2'] = 'value2' WHERE m = 'book2';",
             "UPDATE collectiontable SET n['key3'] = 'value3' WHERE m = 'book3';",
             "UPDATE collectiontable SET n['key4'] = 'value4' WHERE m = 'book4';",
+            "CREATE TABLE nulltable(m text PRIMARY KEY, n map<text, text>);",
+            "UPDATE nulltable SET n['key1'] = 'value1' WHERE m = 'book1';",
+            "UPDATE nulltable SET n['key2'] = 'value2' WHERE m = 'book2';",
+            "UPDATE nulltable SET n['key3'] = 'value3' WHERE m = 'book3';",
+            "UPDATE nulltable SET n['key4'] = 'value4' WHERE m = 'book4';",
     };
 
     @BeforeClass
@@ -229,65 +233,32 @@ public class CqlTableTest extends PigTestBase
     }
 
     @Test
-    public void testCassandraStorageSchema() throws IOException
+    public void testCqlNativeStorageNullTuples() throws IOException
     {
-        //results: (key1,{((111,),),((111,column1),100),((111,column2),10.1)})
-        pig.registerQuery("rows = LOAD 'cassandra://cql3ks/cqltable?" + defaultParameters + "' USING CassandraStorage();");
-
-        //schema: {key: chararray,columns: {(name: (),value: bytearray)}}
-        Iterator<Tuple> it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            String rowKey =  t.get(0).toString();
-            Assert.assertEquals(rowKey, "key1");
-            DataBag columns = (DataBag) t.get(1);
-            Iterator<Tuple> iter = columns.iterator();
-            int i = 0;
-            while (iter.hasNext())
-            {
-                i++;
-                Tuple column = iter.next();
-                if (i==1)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "");
-                    Assert.assertEquals(column.get(1).toString(), "");
-                }
-                if (i==2)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "column1");
-                    Assert.assertEquals(column.get(1), 100);
-                }
-                if (i==3)
-                {
-                    Assert.assertEquals(((Tuple) column.get(0)).get(0), 111);
-                    Assert.assertEquals(((Tuple) column.get(0)).get(1), "column2");
-                    Assert.assertEquals(column.get(1), 10.1f);
-                }
-            }
-            Assert.assertEquals(3, columns.size());
-        }
-        else
-        {
-            Assert.fail("Can't fetch any data");
-        }
+        //input_cql=select * from collectiontable where token(m) > ? and token(m) <= ?
+        NullTupleTest("nulltable= LOAD 'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+    }
 
-        //results: (key1,(column1,100),(column2,10.1))
-        pig.registerQuery("compact_rows = LOAD 'cassandra://cql3ks/compactcqltable?" + defaultParameters + "' USING CassandraStorage();");
+    private void NullTupleTest(String initialQuery) throws IOException
+    {
+        pig.setBatchOn();
+        pig.registerQuery(initialQuery);
+        pig.registerQuery("recs= FOREACH nulltable GENERATE TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', null), TOTUPLE('n', null)));");
+        pig.registerQuery("STORE recs INTO 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&output_query=update+cql3ks.nulltable+set+n+%3D+%3F' USING CqlNativeStorage();");
+        pig.executeBatch();
 
-        //schema: {key: chararray,column1: (name: chararray,value: int),column2: (name: chararray,value: float)}
-        it = pig.openIterator("compact_rows");
+        pig.registerQuery("result= LOAD 'cql://cql3ks/nulltable?" + defaultParameters + nativeParameters + "&input_cql=select%20*%20from%20nulltable%20where%20token(m)%20%3E%20%3F%20and%20token(m)%20%3C%3D%20%3F' USING CqlNativeStorage();");
+        Iterator<Tuple> it = pig.openIterator("result");
         if (it.hasNext()) {
             Tuple t = it.next();
-            String rowKey =  t.get(0).toString();
-            Assert.assertEquals(rowKey, "key1");
-            Tuple column = (Tuple) t.get(1);
-            Assert.assertEquals(column.get(0), "column1");
-            Assert.assertEquals(column.get(1), 100);
-            column = (Tuple) t.get(2);
-            Assert.assertEquals(column.get(0), "column2");
-            Assert.assertEquals(column.get(1), 10.1f);
+            Tuple t1 = (Tuple) t.get(1);
+            Assert.assertEquals(t1.size(), 2);
+            Tuple element1 = (Tuple) t1.get(0);
+            Tuple element2 = (Tuple) t1.get(1);
+            Assert.assertEquals(element1.get(0), "m");
+            Assert.assertEquals(element1.get(1), "");
+            Assert.assertEquals(element2.get(0), "n");
+            Assert.assertEquals(element2.get(1), "");
         }
         else
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
index 850f46d..6525527 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java
@@ -92,16 +92,19 @@ public class CQLSSTableWriterTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges("cql_keyspace"))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 
@@ -251,16 +254,19 @@ public class CQLSSTableWriterTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KS))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index b245994..4a51fbd 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -83,16 +83,19 @@ public class SSTableLoaderTest
 
         SSTableLoader loader = new SSTableLoader(dataDir, new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1))
                     addRangeForEndpoint(range, FBUtilities.getBroadcastAddress());
                 setPartitioner(StorageService.getPartitioner());
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         }, new OutputHandler.SystemOutput(false, false));
 


[4/4] cassandra git commit: Remove Thrift dependencies in bundled tools

Posted by al...@apache.org.
Remove Thrift dependencies in bundled tools

patch by Philip Thompson; reviewed by Aleksey Yeschenko for
CASSANDRA-8358


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f698cc22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f698cc22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f698cc22

Branch: refs/heads/trunk
Commit: f698cc228452e847e3ad46bd8178549cf8171767
Parents: 5b61545
Author: Philip Thompson <pt...@gmail.com>
Authored: Tue May 5 21:38:23 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue May 5 23:57:39 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 build.xml                                       |   8 +-
 lib/cassandra-driver-core-2.1.2.jar             | Bin 638544 -> 0 bytes
 lib/cassandra-driver-core-2.1.5-shaded.jar      | Bin 0 -> 1994984 bytes
 .../apache/cassandra/db/marshal/TypeParser.java |  71 +-
 .../hadoop/AbstractBulkOutputFormat.java        |  73 --
 .../hadoop/AbstractBulkRecordWriter.java        | 239 -------
 .../hadoop/AbstractColumnFamilyInputFormat.java | 245 +++----
 .../AbstractColumnFamilyOutputFormat.java       | 164 -----
 .../AbstractColumnFamilyRecordWriter.java       | 193 -----
 .../cassandra/hadoop/BulkOutputFormat.java      |  51 +-
 .../cassandra/hadoop/BulkRecordWriter.java      | 145 +++-
 .../hadoop/ColumnFamilyInputFormat.java         |  47 +-
 .../hadoop/ColumnFamilyOutputFormat.java        | 119 +++-
 .../hadoop/ColumnFamilyRecordReader.java        |   1 +
 .../hadoop/ColumnFamilyRecordWriter.java        | 124 +++-
 .../hadoop/cql3/CqlBulkOutputFormat.java        |  81 ++-
 .../hadoop/cql3/CqlBulkRecordWriter.java        | 223 ++++--
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  52 +-
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |  76 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  93 ++-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 392 ++++++----
 .../cassandra/hadoop/pig/CassandraStorage.java  | 706 +++++++++++++++++--
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 629 +++++++++++------
 .../cassandra/hadoop/pig/StorageHelper.java     | 121 ++++
 .../cassandra/io/sstable/SSTableLoader.java     |  13 +-
 .../cassandra/service/StorageService.java       |  10 +-
 .../org/apache/cassandra/tools/BulkLoader.java  | 209 ++----
 .../utils/NativeSSTableLoaderClient.java        | 126 ++++
 .../org/apache/cassandra/pig/CqlTableTest.java  |  81 +--
 .../io/sstable/CQLSSTableWriterTest.java        |  14 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   7 +-
 33 files changed, 2678 insertions(+), 1639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f89ece..ab92aa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
  * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242)
  * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049)
  * Distinguish between null and unset in protocol v4 (CASSANDRA-7304)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 03008de..32351a1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -75,6 +75,9 @@ New features
 Upgrading
 ---------
    - Pig's CqlStorage has been removed, use CqlNativeStorage instead
+   - Pig's CassandraStorage has been deprecated. CassandraStorage
+     should only be used against tables created via thrift.
+     Use CqlNativeStorage for all other tables.
    - IAuthenticator been updated to remove responsibility for user/role
      maintenance and is now solely responsible for validating credentials,
      This is primarily done via SASL, though an optional method exists for

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ba99cd9..a5f195f 100644
--- a/build.xml
+++ b/build.xml
@@ -381,7 +381,7 @@
           <dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
-          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" />
+          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
           <dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
           <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.4" />
           <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
@@ -433,7 +433,7 @@
         <dependency groupId="org.apache.pig" artifactId="pig"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
-        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
         <dependency groupId="org.javassist" artifactId="javassist"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
@@ -501,12 +501,12 @@
 
         <dependency groupId="org.apache.thrift" artifactId="libthrift"/>
         <dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
-        
+
         <!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
-      	<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
+        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"/>
 
 
         <!-- don't need jna to run, but nice to have -->

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.2.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.2.jar b/lib/cassandra-driver-core-2.1.2.jar
deleted file mode 100644
index 2095c05..0000000
Binary files a/lib/cassandra-driver-core-2.1.2.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.5-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.5-shaded.jar b/lib/cassandra-driver-core-2.1.5-shaded.jar
new file mode 100644
index 0000000..bb83fb5
Binary files /dev/null and b/lib/cassandra-driver-core-2.1.5-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index ad7ffed..faa678e 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -21,13 +21,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +38,7 @@ public class TypeParser
     private int idx;
 
     // A cache of parsed string, specially useful for DynamicCompositeType
-    private static final Map<String, AbstractType<?>> cache = new HashMap<String, AbstractType<?>>();
+    private static final Map<String, AbstractType<?>> cache = new HashMap<>();
 
     public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
 
@@ -98,9 +94,48 @@ public class TypeParser
         return parse(compareWith == null ? null : compareWith.toString());
     }
 
-    public static String getShortName(AbstractType<?> type)
+    public static String parseCqlNativeType(String str)
     {
-        return type.getClass().getSimpleName();
+        return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
+    }
+
+    public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
+    {
+        str = str.trim().toLowerCase();
+        switch (str)
+        {
+            case "map": return "MapType";
+            case "set": return "SetType";
+            case "list": return "ListType";
+            case "frozen": return "FrozenType";
+            default: throw new SyntaxException("Invalid type name" + str);
+        }
+    }
+
+    /**
+     * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
+     */
+    public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
+    {
+        return parse(parseCqlNameRecurse(str));
+    }
+
+    private static String parseCqlNameRecurse(String str) throws SyntaxException
+    {
+        if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
+        {
+            String[] parseString = str.split(",", 2);
+            return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
+        }
+        else if (str.contains("<"))
+        {
+            String[] parseString = str.trim().split("<", 2);
+            return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
+        }
+        else
+        {
+            return parseCqlNativeType(str);
+        }
     }
 
     /**
@@ -126,7 +161,7 @@ public class TypeParser
         if (str.charAt(idx) != '(')
             throw new IllegalStateException();
 
-        Map<String, String> map = new HashMap<String, String>();
+        Map<String, String> map = new HashMap<>();
         ++idx; // skipping '('
 
         while (skipBlankAndComma())
@@ -157,7 +192,7 @@ public class TypeParser
 
     public List<AbstractType<?>> getTypeParameters() throws SyntaxException, ConfigurationException
     {
-        List<AbstractType<?>> list = new ArrayList<AbstractType<?>>();
+        List<AbstractType<?>> list = new ArrayList<>();
 
         if (isEOS())
             return list;
@@ -191,7 +226,7 @@ public class TypeParser
 
     public Map<Byte, AbstractType<?>> getAliasParameters() throws SyntaxException, ConfigurationException
     {
-        Map<Byte, AbstractType<?>> map = new HashMap<Byte, AbstractType<?>>();
+        Map<Byte, AbstractType<?>> map = new HashMap<>();
 
         if (isEOS())
             return map;
@@ -384,11 +419,7 @@ public class TypeParser
             Field field = typeClass.getDeclaredField("instance");
             return (AbstractType<?>) field.get(null);
         }
-        catch (NoSuchFieldException e)
-        {
-            throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
-        }
-        catch (IllegalAccessException e)
+        catch (NoSuchFieldException | IllegalAccessException e)
         {
             throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
         }
@@ -489,12 +520,6 @@ public class TypeParser
         return str.substring(i, idx);
     }
 
-    public char readNextChar()
-    {
-        skipBlank();
-        return str.charAt(idx++);
-    }
-
     /**
      * Helper function to ease the writing of AbstractType.toString() methods.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
deleted file mode 100644
index c0e91da..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-
-public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
-    implements org.apache.hadoop.mapred.OutputFormat<K, V>
-{
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
-        }
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
deleted file mode 100644
index 5ba0a96..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-
-public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
-implements org.apache.hadoop.mapred.RecordWriter<K, V>
-{
-    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
-    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
-    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
-    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-    
-    private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
-    
-    protected final Configuration conf;
-    protected final int maxFailures;
-    protected final int bufferSize; 
-    protected Closeable writer;
-    protected SSTableLoader loader;
-    protected Progressable progress;
-    protected TaskAttemptContext context;
-    
-    protected AbstractBulkRecordWriter(TaskAttemptContext context)
-    {
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
-    }
-
-    protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
-    {
-        this(conf);
-        this.progress = progress;
-    }
-
-    protected AbstractBulkRecordWriter(Configuration conf)
-    {
-        Config.setOutboundBindAny(true);
-        this.conf = conf;
-        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
-        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
-    }
-
-    protected String getOutputLocation() throws IOException
-    {
-        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
-        if (dir == null)
-            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
-        return dir;
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        if (writer != null)
-        {
-            writer.close();
-            Future<StreamState> future = loader.stream();
-            while (true)
-            {
-                try
-                {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                    break;
-                }
-                catch (ExecutionException | TimeoutException te)
-                {
-                    if (null != progress)
-                        progress.progress();
-                    if (null != context)
-                        HadoopCompat.progress(context);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-            if (loader.getFailedHosts().size() > 0)
-            {
-                if (loader.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
-                else
-                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
-            }
-        }
-    }
-
-    public static class ExternalClient extends SSTableLoader.Client
-    {
-        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
-        private final Configuration conf;
-        private final String hostlist;
-        private final int rpcPort;
-        private final String username;
-        private final String password;
-
-        public ExternalClient(Configuration conf)
-        {
-          super();
-          this.conf = conf;
-          this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
-          this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
-          this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
-          this.password = ConfigHelper.getOutputKeyspacePassword(conf);
-        }
-
-        public void init(String keyspace)
-        {
-            String[] nodes = hostlist.split(",");
-            Set<InetAddress> hosts = new HashSet<InetAddress>(nodes.length);
-            for (String node : nodes)
-            {
-                try
-                {
-                    hosts.add(InetAddress.getByName(node));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
-
-                    // log in
-                    client.set_keyspace(keyspace);
-                    if (username != null)
-                    {
-                        Map<String, String> creds = new HashMap<String, String>();
-                        creds.put(PasswordAuthenticator.USERNAME_KEY, username);
-                        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
-                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-                        client.login(authRequest);
-                    }
-
-                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                    List<KsDef> ksDefs = client.describe_keyspaces();
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : tokenRanges)
-                    {
-                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    for (KsDef ksDef : ksDefs)
-                    {
-                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
-                        for (CfDef cfDef : ksDef.cf_defs)
-                            cfs.put(cfDef.name, ThriftConversion.fromThrift(cfDef));
-                        knownCfs.put(ksDef.name, cfs);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
-                }
-            }
-        }
-
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
-        } 
-    }
-
-    public static class NullOutputHandler implements OutputHandler
-    {
-        public void output(String msg) {}
-        public void debug(String msg) {}
-        public void warn(String msg) {}
-        public void warn(String msg, Throwable th) {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 691bd76..2ef4cf4 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,31 +18,27 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
 
 public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
 {
@@ -51,7 +47,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     public static final String MAPRED_TASK_ID = "mapred.task.id";
     // The simple fact that we need this is because the old Hadoop API wants us to "write"
     // to the key and value whereas the new asks for it.
-    // I choose 8kb as the default max key size (instanciated only once), but you can
+    // I choose 8kb as the default max key size (instantiated only once), but you can
     // override it in your jobConf with this setting.
     public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
     public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
@@ -59,6 +55,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     private String keyspace;
     private String cfName;
     private IPartitioner partitioner;
+    private Session session;
 
     protected void validateConfiguration(Configuration conf)
     {
@@ -72,57 +69,27 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
             throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
     }
 
-    public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF input format");
-        TTransport transport;
-        try
-        {
-            transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
-        }
-        catch (Exception e)
-        {
-            throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
-        }
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-
-        // log in
-        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
-        if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
-        {
-            Map<String, String> creds = new HashMap<String, String>();
-            creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
-            creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
-            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-            client.login(authRequest);
-        }
-        logger.debug("Authenticated client for CF input format created successfully");
-        return client;
-    }
-
     public List<InputSplit> getSplits(JobContext context) throws IOException
     {
-        Configuration conf = HadoopCompat.getConfiguration(context);;
+        Configuration conf = HadoopCompat.getConfiguration(context);
 
         validateConfiguration(conf);
 
-        // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
         keyspace = ConfigHelper.getInputKeyspace(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         partitioner = ConfigHelper.getInputPartitioner(conf);
         logger.debug("partitioner is {}", partitioner);
 
+        // canonical ranges and nodes holding replicas
+        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
 
-        // cannonical ranges, split into pieces, fetching the splits in parallel
+        // canonical ranges, split into pieces, fetching the splits in parallel
         ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        List<InputSplit> splits = new ArrayList<InputSplit>();
+        List<InputSplit> splits = new ArrayList<>();
 
         try
         {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+            List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             Range<Token> jobRange = null;
             if (jobKeyRange != null)
@@ -130,7 +97,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                 if (jobKeyRange.start_key != null)
                 {
                     if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
+                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
                     if (jobKeyRange.start_token != null)
                         throw new IllegalArgumentException("only start_key supported");
                     if (jobKeyRange.end_token != null)
@@ -149,26 +116,25 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                 }
             }
 
-            for (TokenRange range : masterRangeNodes)
+            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+            Metadata metadata = session.getCluster().getMetadata();
+
+            for (TokenRange range : masterRangeNodes.keySet())
             {
                 if (jobRange == null)
                 {
-                    // for each range, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                    // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
                 }
                 else
                 {
-                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
-                                                             partitioner.getTokenFactory().fromString(range.end_token));
-
-                    if (dhtRange.intersects(jobRange))
+                    TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
+                    if (range.intersects(jobTokenRange))
                     {
-                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
+                        for (TokenRange intersection: range.intersectWith(jobTokenRange))
                         {
-                            range.start_token = partitioner.getTokenFactory().toString(intersection.left);
-                            range.end_token = partitioner.getTokenFactory().toString(intersection.right);
-                            // for each range, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                            // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
                         }
                     }
                 }
@@ -197,53 +163,53 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         return splits;
     }
 
+    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+    {
+        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+    }
+
     /**
-     * Gets a token range and splits it up according to the suggested
+     * Gets a token tokenRange and splits it up according to the suggested
      * size into input splits that Hadoop can use.
      */
     class SplitCallable implements Callable<List<InputSplit>>
     {
 
-        private final TokenRange range;
+        private final TokenRange tokenRange;
+        private final Set<Host> hosts;
         private final Configuration conf;
 
-        public SplitCallable(TokenRange tr, Configuration conf)
+        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
         {
-            this.range = tr;
+            this.tokenRange = tr;
+            this.hosts = hosts;
             this.conf = conf;
         }
 
         public List<InputSplit> call() throws Exception
         {
-            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
-            assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
+            ArrayList<InputSplit> splits = new ArrayList<>();
+            Map<TokenRange, Long> subSplits;
+            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
             // turn the sub-ranges into InputSplits
-            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+            String[] endpoints = new String[hosts.size()];
+
             // hadoop needs hostname, not ip
             int endpointIndex = 0;
-            for (String endpoint: range.rpc_endpoints)
-            {
-                String endpoint_address = endpoint;
-                if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
-                    endpoint_address = range.endpoints.get(endpointIndex);
-                endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
-            }
+            for (Host endpoint : hosts)
+                endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
 
-            Token.TokenFactory factory = partitioner.getTokenFactory();
-            for (CfSplit subSplit : subSplits)
+            for (TokenRange subSplit : subSplits.keySet())
             {
-                Token left = factory.fromString(subSplit.getStart_token());
-                Token right = factory.fromString(subSplit.getEnd_token());
-                Range<Token> range = new Range<Token>(left, right);
-                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
-                for (Range<Token> subrange : ranges)
+                List<TokenRange> ranges = subSplit.unwrap();
+                for (TokenRange subrange : ranges)
                 {
                     ColumnFamilySplit split =
                             new ColumnFamilySplit(
-                                    factory.toString(subrange.left),
-                                    factory.toString(subrange.right),
-                                    subSplit.getRow_count(),
+                                    subrange.getStart().toString().substring(2),
+                                    subrange.getEnd().toString().substring(2),
+                                    subSplits.get(subSplit),
                                     endpoints);
 
                     logger.debug("adding {}", split);
@@ -254,80 +220,63 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         }
     }
 
-    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
     {
-        int splitsize = ConfigHelper.getInputSplitSize(conf);
-        for (int i = 0; i < range.rpc_endpoints.size(); i++)
+        int splitSize = ConfigHelper.getInputSplitSize(conf);
+        try
         {
-            String host = range.rpc_endpoints.get(i);
-
-            if (host == null || host.equals("0.0.0.0"))
-                host = range.endpoints.get(i);
-
-            try
-            {
-                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
-                client.set_keyspace(keyspace);
-
-                try
-                {
-                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
-                }
-                catch (TApplicationException e)
-                {
-                    // fallback to guessing split size if talking to a server without describe_splits_ex method
-                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
-                    {
-                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
-                        return tokenListToSplits(splitPoints, splitsize);
-                    }
-                    throw e;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.debug("failed connect to endpoint {}", host, e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
+            return describeSplits(keyspace, cfName, range, splitSize);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
         }
-        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
     }
 
-    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
     {
-        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
-        for (int j = 0; j < splitTokens.size() - 1; j++)
-            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
-        return splits;
+        Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+
+        Map<TokenRange, Set<Host>> map = new HashMap<>();
+        Metadata metadata = session.getCluster().getMetadata();
+        for (TokenRange tokenRange : metadata.getTokenRanges())
+            map.put(tokenRange, metadata.getReplicas(keyspace, tokenRange));
+        return map;
     }
 
-    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
     {
-        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
-        List<TokenRange> map;
-        try
+        String query = String.format("SELECT mean_partition_size, partitions_count " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.SIZE_ESTIMATES);
+
+        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+        Row row = resultSet.one();
+        // If we have no data on this split, return the full split i.e., do not sub-split
+        // Assume smallest granularity of partition count available from CASSANDRA-7688
+        if (row == null)
         {
-            map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
+            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+            wrappedTokenRange.put(tokenRange, (long) 128);
+            return wrappedTokenRange;
         }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        
-        return map;
+
+        long meanPartitionSize = row.getLong("mean_partition_size");
+        long partitionCount = row.getLong("partitions_count");
+
+        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+        for (TokenRange range : splitRanges)
+            rangesWithLength.put(range, partitionCount/splitCount);
+
+        return rangesWithLength;
     }
 
-    //
     // Old Hadoop API
-    //
     public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     {
         TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
deleted file mode 100644
index 03d0045..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * values) as Cassandra rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * Keyspace and ColumnFamily in your
- * Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
- * </p>
- * @param <Y>
- */
-public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
-{
-    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
-    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
-    private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
-
-
-    /**
-     * Check for validity of the output-specification for the job.
-     *
-     * @param context
-     *            information about the job
-     */
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    protected void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    /**
-     * The OutputCommitter for this format does not write any data to the DFS.
-     *
-     * @param context
-     *            the task context
-     * @return an output committer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /**
-     * Connects to the given server:port and returns a client based on the given socket that points to the configured
-     * keyspace, and is logged in with the configured credentials.
-     *
-     * @param host fully qualified host name to connect to
-     * @param port RPC port of the server
-     * @param conf a job configuration
-     * @return a cassandra client
-     * @throws Exception set of thrown exceptions may be implementation defined,
-     *                   depending on the used transport factory
-     */
-    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-        String user = ConfigHelper.getOutputKeyspaceUserName(conf);
-        String password = ConfigHelper.getOutputKeyspacePassword(conf);
-        if ((user != null) && (password != null))
-            login(user, password, client);
-
-        logger.debug("Authenticated client for CF output format created successfully");
-        return client;
-    }
-
-    public static void login(String user, String password, Cassandra.Client client) throws Exception
-    {
-        Map<String, String> creds = new HashMap<String, String>();
-        creds.put(PasswordAuthenticator.USERNAME_KEY, user);
-        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
-        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-        client.login(authRequest);
-    }
-
-    /**
-     * An {@link OutputCommitter} that does nothing.
-     */
-    private static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
deleted file mode 100644
index cb44beb..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.transport.TTransport;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
- *
- * <p>
- * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y>
-{
-    // The configuration this writer is associated with.
-    protected final Configuration conf;
-
-    // The ring cache that describes the token ranges each node in the ring is
-    // responsible for. This is what allows us to group the mutations by
-    // the endpoints they should be targeted at. The targeted endpoint
-    // essentially
-    // acts as the primary replica for the rows being affected by the mutations.
-    protected final RingCache ringCache;
-
-    // The number of mutations to buffer per endpoint
-    protected final int queueSize;
-
-    protected final long batchThreshold;
-
-    protected final ConsistencyLevel consistencyLevel;
-    protected Progressable progressable;
-    protected TaskAttemptContext context;
-
-    protected AbstractColumnFamilyRecordWriter(Configuration conf)
-    {
-        this.conf = conf;
-        this.ringCache = new RingCache(conf);
-        this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
-        batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
-        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
-    }
-    
-    /**
-     * Close this <code>RecordWriter</code> to future operations, but not before
-     * flushing out the batched mutations.
-     *
-     * @param context the context of the task
-     * @throws IOException
-     */
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-    
-    protected abstract void close() throws IOException;
-
-    /**
-     * A client that runs in a threadpool and connects to the list of endpoints for a particular
-     * range. Mutations for keys in that range are sent to this client via a queue.
-     */
-    public abstract class AbstractRangeClient<K> extends Thread
-    {
-        // The list of endpoints for this range
-        protected final List<InetAddress> endpoints;
-        // A bounded queue of incoming mutations for this range
-        protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
-
-        protected volatile boolean run = true;
-        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
-        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
-        // when the client is closed.
-        protected volatile IOException lastException;
-
-        protected Cassandra.Client client;
-
-        /**
-         * Constructs an {@link AbstractRangeClient} for the given endpoints.
-         * @param endpoints the possible endpoints to execute the mutations on
-         */
-        public AbstractRangeClient(List<InetAddress> endpoints)
-        {
-            super("client-" + endpoints);
-            this.endpoints = endpoints;
-         }
-
-        /**
-         * enqueues the given value to Cassandra
-         */
-        public void put(K value) throws IOException
-        {
-            while (true)
-            {
-                if (lastException != null)
-                    throw lastException;
-                try
-                {
-                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
-                        break;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
-        }
-
-        public void close() throws IOException
-        {
-            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
-            run = false;
-            interrupt();
-            try
-            {
-                this.join();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            if (lastException != null)
-                throw lastException;
-        }
-
-        protected void closeInternal()
-        {
-            if (client != null)
-            {
-                TTransport transport = client.getOutputProtocol().getTransport();
-                if (transport.isOpen())
-                    transport.close();
-            }
-        }
-
-        /**
-         * Loops collecting mutations from the queue and sending to Cassandra
-         */
-        public abstract void run();
-
-        @Override
-        public String toString()
-        {
-            return "#<Client for " + endpoints + ">";
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f5a5a8d..5282279 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,9 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 
-public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+        implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
 {
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -39,4 +42,50 @@ public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<M
     {
         return new BulkRecordWriter(context);
     }
+
+
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+        }
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d67b856..6b9ecb5 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -17,24 +17,57 @@
  */
 package org.apache.cassandra.hadoop;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.CounterColumn;
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
-public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>>
+        implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
 {
+    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+    private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
+
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize;
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
     private File outputDir;
     
     
@@ -55,17 +88,32 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
 
     BulkRecordWriter(TaskAttemptContext context)
     {
-        super(context);
+
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
     }
 
     BulkRecordWriter(Configuration conf, Progressable progress)
     {
-        super(conf, progress);
+        this(conf);
+        this.progress = progress;
     }
 
     BulkRecordWriter(Configuration conf)
     {
-        super(conf);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+    }
+
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
     }
 
     private void setTypes(Mutation mutation)
@@ -115,6 +163,54 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
     }
 
     @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        if (writer != null)
+        {
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
+            {
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+            }
+        }
+    }
+
+    @Override
     public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
     {
         setTypes(value.get(0));
@@ -158,4 +254,43 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
                 HadoopCompat.progress(context);
         }
     }
+
+    public static class ExternalClient extends NativeSSTableLoaderClient
+    {
+        public ExternalClient(Configuration conf)
+        {
+            super(resolveHostAddresses(conf),
+                  CqlConfigHelper.getOutputNativePort(conf),
+                  ConfigHelper.getOutputKeyspaceUserName(conf),
+                  ConfigHelper.getOutputKeyspacePassword(conf),
+                  CqlConfigHelper.getSSLOptions(conf).orNull());
+        }
+
+        private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+        {
+            Set<InetAddress> addresses = new HashSet<>();
+
+            for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
+            {
+                try
+                {
+                    addresses.add(InetAddress.getByName(host));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return addresses;
+        }
+    }
+
+    public static class NullOutputHandler implements OutputHandler
+    {
+        public void output(String msg) {}
+        public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 686d486..88dd2e2 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,11 +21,23 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.PasswordAuthenticator;
 import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -44,9 +56,40 @@ import org.apache.hadoop.mapreduce.*;
  *
  * The default split size is 64k rows.
  */
+@Deprecated
 public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
 {
-    
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
+    public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF input format");
+        TTransport transport;
+        try
+        {
+            transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
+        }
+        catch (Exception e)
+        {
+            throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
+        }
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+
+        // log in
+        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
+        if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
+        {
+            Map<String, String> creds = new HashMap<String, String>();
+            creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+            creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+            client.login(authRequest);
+        }
+        logger.debug("Authenticated client for CF input format created successfully");
+        return client;
+    }
+
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 2990bf3..94ced69 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -18,10 +18,18 @@
 package org.apache.cassandra.hadoop;
 
 
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+
+import org.slf4j.*;
+
+import org.apache.cassandra.auth.*;
 import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
 
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
@@ -44,8 +52,93 @@ import org.apache.hadoop.mapreduce.*;
  * official by sending a batch mutate request to Cassandra.
  * </p>
  */
-public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+        implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
 {
+    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+
+    /**
+     * The OutputCommitter for this format does not write any data to the DFS.
+     *
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     *
+     * @param context
+     *            information about the job
+     */
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    protected void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /**
+     * Connects to the given server:port and returns a client based on the given socket that points to the configured
+     * keyspace, and is logged in with the configured credentials.
+     *
+     * @param host fully qualified host name to connect to
+     * @param port RPC port of the server
+     * @param conf a job configuration
+     * @return a cassandra client
+     * @throws Exception set of thrown exceptions may be implementation defined,
+     *                   depending on the used transport factory
+     */
+    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF output format");
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+        String user = ConfigHelper.getOutputKeyspaceUserName(conf);
+        String password = ConfigHelper.getOutputKeyspacePassword(conf);
+        if ((user != null) && (password != null))
+            login(user, password, client);
+
+        logger.debug("Authenticated client for CF output format created successfully");
+        return client;
+    }
+
+    public static void login(String user, String password, Cassandra.Client client) throws Exception
+    {
+        Map<String, String> creds = new HashMap<String, String>();
+        creds.put(PasswordAuthenticator.USERNAME_KEY, user);
+        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
+        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+        client.login(authRequest);
+    }
+
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
@@ -64,4 +157,26 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B
     {
         return new ColumnFamilyRecordWriter(context);
     }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    private static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 35437e9..d205f13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 
+@Deprecated
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
     implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index d6a873b..31c7047 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -22,15 +22,19 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.*;
 
+import org.apache.cassandra.client.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
 import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.transport.*;
 
 
 /**
@@ -47,10 +51,30 @@ import org.apache.hadoop.util.Progressable;
  *
  * @see ColumnFamilyOutputFormat
  */
-final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements
+        org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
 {
+    // The configuration this writer is associated with.
+    protected final Configuration conf;
+
+    // The number of mutations to buffer per endpoint
+    protected final int queueSize;
+
+    protected final long batchThreshold;
+
+    protected final ConsistencyLevel consistencyLevel;
+    protected Progressable progressable;
+    protected TaskAttemptContext context;
     // handles for clients for each range running in the threadpool
     private final Map<Range, RangeClient> clients;
+
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the mutations.
+    private final RingCache ringCache;
     
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -73,11 +97,33 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
 
     ColumnFamilyRecordWriter(Configuration conf)
     {
-        super(conf);
+        this.conf = conf;
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+        this.ringCache = new RingCache(conf);
         this.clients = new HashMap<Range, RangeClient>();
     }
-    
-    @Override
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
     public void close() throws IOException
     {
         // close all the clients before throwing anything
@@ -138,8 +184,20 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Mutations for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
+    public class RangeClient extends Thread
     {
+        // The list of endpoints for this range
+        protected final List<InetAddress> endpoints;
+        // A bounded queue of incoming mutations for this range
+        protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize);
+
+        protected volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
+        protected volatile IOException lastException;
+
+        protected Cassandra.Client client;
         public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
         
         /**
@@ -148,8 +206,58 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
         */
         public RangeClient(List<InetAddress> endpoints)
         {
-            super(endpoints);
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
          }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(Pair<ByteBuffer, Mutation> value) throws IOException
+        {
+            while (true)
+            {
+                if (lastException != null)
+                    throw lastException;
+                try
+                {
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+
+        public void close() throws IOException
+        {
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
+            run = false;
+            interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
+        }
+
+        protected void closeInternal()
+        {
+            if (client != null)
+            {
+                TTransport transport = client.getOutputProtocol().getTransport();
+                if (transport.isOpen())
+                    transport.close();
+            }
+        }
         
         /**
          * Loops collecting mutations from the queue and sending to Cassandra


[3/4] cassandra git commit: Remove Thrift dependencies in bundled tools

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
index 78080e2..3899f8c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -22,11 +22,14 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
 import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
@@ -35,7 +38,7 @@ import org.apache.hadoop.util.Progressable;
  * The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
  * bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
  *
  * <p>
  * As is the case with the {@link org.apache.cassandra.hadoop.cql3.CqlOutputFormat}, 
@@ -48,13 +51,14 @@ import org.apache.hadoop.util.Progressable;
  * simple.
  * </p>
  */
-public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>>
+public class CqlBulkOutputFormat extends OutputFormat<Object, List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.OutputFormat<Object, List<ByteBuffer>>
 {   
   
-    private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
-    private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+    private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.table.schema.";
+    private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.table.insert.";
     private static final String DELETE_SOURCE = "cassandra.output.delete.source";
-    private static final String COLUMNFAMILY_ALIAS_PREFIX = "cqlbulkoutputformat.columnfamily.alias.";
+    private static final String TABLE_ALIAS_PREFIX = "cqlbulkoutputformat.table.alias.";
   
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -75,33 +79,60 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
     {
         return new CqlBulkRecordWriter(context);
     }
+
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace with setTable()");
+        }
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
     
-    public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema)
+    public static void setTableSchema(Configuration conf, String columnFamily, String schema)
     {
         conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
     }
 
-    public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement)
+    public static void setTableInsertStatement(Configuration conf, String columnFamily, String insertStatement)
     {
         conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
     }
     
-    public static String getColumnFamilySchema(Configuration conf, String columnFamily)
+    public static String getTableSchema(Configuration conf, String columnFamily)
     {
         String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
         if (schema == null)
         { 
-            throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema.");
+            throw new UnsupportedOperationException("You must set the Table schema using setTableSchema.");
         }
         return schema; 
     }
     
-    public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily)
+    public static String getTableInsertStatement(Configuration conf, String columnFamily)
     {
         String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); 
         if (insert == null)
         {
-            throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema.");
+            throw new UnsupportedOperationException("You must set the Table insert statement using setTableSchema.");
         }
         return insert;
     }
@@ -116,13 +147,31 @@ public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<B
         return conf.getBoolean(DELETE_SOURCE, false);
     }
     
-    public static void setColumnFamilyAlias(Configuration conf, String alias, String columnFamily)
+    public static void setTableAlias(Configuration conf, String alias, String columnFamily)
     {
-        conf.set(COLUMNFAMILY_ALIAS_PREFIX + alias, columnFamily);
+        conf.set(TABLE_ALIAS_PREFIX + alias, columnFamily);
     }
     
-    public static String getColumnFamilyForAlias(Configuration conf, String alias)
+    public static String getTableForAlias(Configuration conf, String alias)
     {
-        return conf.get(COLUMNFAMILY_ALIAS_PREFIX + alias);
+        return conf.get(TABLE_ALIAS_PREFIX + alias);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
index 60cd511..e77c4c8 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -17,17 +17,22 @@
  */
 package org.apache.cassandra.hadoop.cql3;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
 import org.apache.cassandra.hadoop.BulkRecordWriter;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
@@ -35,11 +40,12 @@ import org.apache.cassandra.io.sstable.CQLSSTableWriter;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
-
 /**
  * The <code>CqlBulkRecordWriter</code> maps the output &lt;key, value&gt;
  * pairs to a Cassandra column family. In particular, it applies the binded variables
@@ -54,10 +60,26 @@ import org.apache.hadoop.util.Progressable;
  *
  * @see CqlBulkOutputFormat
  */
-public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>>
+public class CqlBulkRecordWriter extends RecordWriter<Object, List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.RecordWriter<Object, List<ByteBuffer>>
 {
+    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+    private final Logger logger = LoggerFactory.getLogger(CqlBulkRecordWriter.class);
+
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize;
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
+
     private String keyspace;
-    private String columnFamily;
+    private String table;
     private String schema;
     private String insertStatement;
     private File outputDir;
@@ -65,19 +87,25 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
 
     CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
     {
-        super(context);
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
         setConfigs();
     }
 
     CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
     {
-        super(conf, progress);
+        this(conf);
+        this.progress = progress;
         setConfigs();
     }
 
     CqlBulkRecordWriter(Configuration conf) throws IOException
     {
-        super(conf);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
         setConfigs();
     }
     
@@ -85,54 +113,55 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
     {
         // if anything is missing, exceptions will be thrown here, instead of on write()
         keyspace = ConfigHelper.getOutputKeyspace(conf);
-        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+        table = ConfigHelper.getOutputColumnFamily(conf);
         
-        // check if columnFamily is aliased
-        String aliasedCf = CqlBulkOutputFormat.getColumnFamilyForAlias(conf, columnFamily);
+        // check if table is aliased
+        String aliasedCf = CqlBulkOutputFormat.getTableForAlias(conf, table);
         if (aliasedCf != null)
-            columnFamily = aliasedCf;
+            table = aliasedCf;
         
-        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
-        insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
-        outputDir = getColumnFamilyDirectory();
+        schema = CqlBulkOutputFormat.getTableSchema(conf, table);
+        insertStatement = CqlBulkOutputFormat.getTableInsertStatement(conf, table);
+        outputDir = getTableDirectory();
         deleteSrc = CqlBulkOutputFormat.getDeleteSourceOnSuccess(conf);
     }
 
-    
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
+    }
+
     private void prepareWriter() throws IOException
     {
-        try
+        if (writer == null)
         {
-            if (writer == null)
-            {
-                writer = CQLSSTableWriter.builder()
-                    .forTable(schema)
-                    .using(insertStatement)
-                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
-                    .inDirectory(outputDir)
-                    .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
-                    .build();
-            }
-            if (loader == null)
-            {
-                ExternalClient externalClient = new ExternalClient(conf);
-                
-                externalClient.addKnownCfs(keyspace, schema);
-
-                this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler()) {
-                    @Override
-                    public void onSuccess(StreamState finalState)
-                    {
-                        if (deleteSrc)
-                            FileUtils.deleteRecursive(outputDir);
-                    }
-                };
-            }
+            writer = CQLSSTableWriter.builder()
+                                     .forTable(schema)
+                                     .using(insertStatement)
+                                     .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+                                     .inDirectory(outputDir)
+                                     .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+                                     .build();
         }
-        catch (Exception e)
+
+        if (loader == null)
         {
-            throw new IOException(e);
-        }      
+            ExternalClient externalClient = new ExternalClient(conf);
+            externalClient.setTableMetadata(CFMetaData.compile(schema, keyspace));
+
+            loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler())
+            {
+                @Override
+                public void onSuccess(StreamState finalState)
+                {
+                    if (deleteSrc)
+                        FileUtils.deleteRecursive(outputDir);
+                }
+            };
+        }
     }
     
     /**
@@ -168,9 +197,9 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
         }
     }
     
-    private File getColumnFamilyDirectory() throws IOException
+    private File getTableDirectory() throws IOException
     {
-        File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily, UUID.randomUUID().toString()));
+        File dir = new File(String.format("%s%s%s%s%s-%s", getOutputLocation(), File.separator, keyspace, File.separator, table, UUID.randomUUID().toString()));
         
         if (!dir.exists() && !dir.mkdirs())
         {
@@ -179,41 +208,83 @@ public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<B
         
         return dir;
     }
-    
-    public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
     {
-        private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
-        
-        public ExternalClient(Configuration conf)
-        {
-            super(conf);
-        }
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
 
-        public void addKnownCfs(String keyspace, String cql)
+    private void close() throws IOException
+    {
+        if (writer != null)
         {
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            
-            if (cfs == null)
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
             {
-                cfs = new HashMap<>();
-                knownCqlCfs.put(keyspace, cfs);
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
             }
-            
-            CFMetaData metadata = CFMetaData.compile(cql, keyspace);
-            cfs.put(metadata.cfName, metadata);
         }
-        
-        @Override
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
+    }
+    
+    public static class ExternalClient extends NativeSSTableLoaderClient
+    {
+        public ExternalClient(Configuration conf)
         {
-            CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
-            if (metadata != null)
+            super(resolveHostAddresses(conf),
+                  CqlConfigHelper.getOutputNativePort(conf),
+                  ConfigHelper.getOutputKeyspaceUserName(conf),
+                  ConfigHelper.getOutputKeyspacePassword(conf),
+                  CqlConfigHelper.getSSLOptions(conf).orNull());
+        }
+
+        private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+        {
+            Set<InetAddress> addresses = new HashSet<>();
+
+            for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
             {
-                return metadata;
+                try
+                {
+                    addresses.add(InetAddress.getByName(host));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
             }
-            
-            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
+
+            return addresses;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index ac5a7e5..3033fa6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -34,22 +34,23 @@ import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManagerFactory;
 
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.io.util.FileUtils;
+import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
 
 import com.datastax.driver.core.AuthProvider;
-import com.datastax.driver.core.PlainTextAuthProvider;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
 import com.datastax.driver.core.PoolingOptions;
 import com.datastax.driver.core.ProtocolOptions;
 import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.policies.LoadBalancingPolicy;
-import com.google.common.base.Optional;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
 
 public class CqlConfigHelper
 {
@@ -84,6 +85,7 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_PROTOCOL_VERSION = "cassandra.input.native.protocol.version";
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
+    private static final String OUTPUT_NATIVE_PORT = "cassandra.output.native.port";
     
     /**
      * Set the CQL columns for the input of this job.
@@ -176,6 +178,11 @@ public class CqlConfigHelper
         return Integer.parseInt(conf.get(INPUT_NATIVE_PORT, "9042"));
     }
 
+    public static int getOutputNativePort(Configuration conf)
+    {
+        return Integer.parseInt(conf.get(OUTPUT_NATIVE_PORT, "9042"));
+    }
+
     public static Optional<Integer> getInputMinSimultReqPerConnections(Configuration conf)
     {
         return getIntSetting(INPUT_NATIVE_MIN_SIMULT_REQ_PER_CONNECTION, conf);
@@ -294,6 +301,22 @@ public class CqlConfigHelper
     public static Cluster getInputCluster(String[] hosts, Configuration conf)
     {
         int port = getInputNativePort(conf);
+        return getCluster(hosts, conf, port);
+    }
+
+    public static Cluster getOutputCluster(String host, Configuration conf)
+    {
+        return getOutputCluster(new String[]{host}, conf);
+    }
+
+    public static Cluster getOutputCluster(String[] hosts, Configuration conf)
+    {
+        int port = getOutputNativePort(conf);
+        return getCluster(hosts, conf, port);
+    }
+
+    public static Cluster getCluster(String[] hosts, Configuration conf, int port)
+    {
         Optional<AuthProvider> authProvider = getAuthProvider(conf);
         Optional<SSLOptions> sslOptions = getSSLOptions(conf);
         Optional<Integer> protocolVersion = getProtocolVersion(conf);
@@ -301,11 +324,11 @@ public class CqlConfigHelper
         SocketOptions socketOptions = getReadSocketOptions(conf);
         QueryOptions queryOptions = getReadQueryOptions(conf);
         PoolingOptions poolingOptions = getReadPoolingOptions(conf);
-        
+
         Cluster.Builder builder = Cluster.builder()
-                                         .addContactPoints(hosts)
-                                         .withPort(port)
-                                         .withCompression(ProtocolOptions.Compression.NONE);
+                .addContactPoints(hosts)
+                .withPort(port)
+                .withCompression(ProtocolOptions.Compression.NONE);
 
         if (authProvider.isPresent())
             builder.withAuthProvider(authProvider.get());
@@ -316,14 +339,13 @@ public class CqlConfigHelper
             builder.withProtocolVersion(protocolVersion.get());
         }
         builder.withLoadBalancingPolicy(loadBalancingPolicy)
-               .withSocketOptions(socketOptions)
-               .withQueryOptions(queryOptions)
-               .withPoolingOptions(poolingOptions);
+                .withSocketOptions(socketOptions)
+                .withQueryOptions(queryOptions)
+                .withPoolingOptions(poolingOptions);
 
         return builder.build();
     }
 
-
     public static void setInputCoreConnections(Configuration conf, String connections)
     {
         conf.set(INPUT_NATIVE_CORE_CONNECTIONS_PER_HOST, connections);
@@ -502,7 +524,7 @@ public class CqlConfigHelper
         return Optional.of(getClientAuthProvider(authProvider.get(), conf));
     }
 
-    private static Optional<SSLOptions> getSSLOptions(Configuration conf)
+    public static Optional<SSLOptions> getSSLOptions(Configuration conf)
     {
         Optional<String> truststorePath = getInputNativeSSLTruststorePath(conf);
         Optional<String> keystorePath = getInputNativeSSLKeystorePath(conf);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 0d09ca2..9a1cda6 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -23,15 +23,15 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.*;
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapreduce.*;
 
 /**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * The <code>CqlOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
  * bound variable values) as CQL rows (and respective columns) in a given
- * ColumnFamily.
+ * table.
  *
  * <p>
  * As is the case with the {@link org.apache.cassandra.hadoop.ColumnFamilyInputFormat}, 
@@ -52,8 +52,51 @@ import org.apache.hadoop.mapreduce.*;
  * to Cassandra.
  * </p>
  */
-public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
-{   
+public class CqlOutputFormat extends OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+        implements org.apache.hadoop.mapred.OutputFormat<Map<String, ByteBuffer>, List<ByteBuffer>>
+{
+    /**
+     * Check for validity of the output-specification for the job.
+     *
+     * @param context
+     *            information about the job
+     */
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    protected void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /**
+     * The OutputCommitter for this format does not write any data to the DFS.
+     *
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public CqlRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -73,4 +116,25 @@ public class CqlOutputFormat extends AbstractColumnFamilyOutputFormat<Map<String
     {
         return new CqlRecordWriter(context);
     }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    private static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
index 308bdf8..4a7bd59 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordReader.java
@@ -37,13 +37,15 @@ import org.slf4j.LoggerFactory;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.Token;
 import com.datastax.driver.core.TupleValue;
 import com.datastax.driver.core.UDTValue;
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+import com.google.common.reflect.TypeToken;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.hadoop.ColumnFamilySplit;
@@ -493,36 +495,72 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         }
 
         @Override
+        public <T> List<T> getList(int i, TypeToken<T> typeToken)
+        {
+            return row.getList(i, typeToken);
+        }
+
+        @Override
         public <T> List<T> getList(String name, Class<T> elementsClass)
         {
             return row.getList(name, elementsClass);
         }
 
         @Override
+        public <T> List<T> getList(String s, TypeToken<T> typeToken)
+        {
+            return row.getList(s, typeToken);
+        }
+
+        @Override
         public <T> Set<T> getSet(int i, Class<T> elementsClass)
         {
             return row.getSet(i, elementsClass);
         }
 
         @Override
+        public <T> Set<T> getSet(int i, TypeToken<T> typeToken)
+        {
+            return row.getSet(i, typeToken);
+        }
+
+        @Override
         public <T> Set<T> getSet(String name, Class<T> elementsClass)
         {
             return row.getSet(name, elementsClass);
         }
 
         @Override
+        public <T> Set<T> getSet(String s, TypeToken<T> typeToken)
+        {
+            return row.getSet(s, typeToken);
+        }
+
+        @Override
         public <K, V> Map<K, V> getMap(int i, Class<K> keysClass, Class<V> valuesClass)
         {
             return row.getMap(i, keysClass, valuesClass);
         }
 
         @Override
+        public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+        {
+            return row.getMap(i, typeToken, typeToken1);
+        }
+
+        @Override
         public <K, V> Map<K, V> getMap(String name, Class<K> keysClass, Class<V> valuesClass)
         {
             return row.getMap(name, keysClass, valuesClass);
         }
 
         @Override
+        public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1)
+        {
+            return row.getMap(s, typeToken, typeToken1);
+        }
+
+        @Override
         public UDTValue getUDTValue(int i)
         {
             return row.getUDTValue(i);
@@ -545,6 +583,24 @@ public class CqlRecordReader extends RecordReader<Long, Row>
         {
             return row.getTupleValue(name);
         }
+
+        @Override
+        public Token getToken(int i)
+        {
+            return row.getToken(i);
+        }
+
+        @Override
+        public Token getToken(String name)
+        {
+            return row.getToken(name);
+        }
+
+        @Override
+        public Token getPartitionKeyToken()
+        {
+            return row.getPartitionKeyToken();
+        }
     }
 
     /**
@@ -604,36 +660,21 @@ public class CqlRecordReader extends RecordReader<Long, Row>
 
     private void fetchKeys()
     {
-        String query = String.format("SELECT column_name, component_index, type " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNS,
-                                     keyspace,
-                                     cfName);
-
         // get CF meta data
-        List<Row> rows = session.execute(query).all();
-        if (rows.isEmpty())
+        TableMetadata tableMetadata = session.getCluster()
+                                             .getMetadata()
+                                             .getKeyspace(Metadata.quote(keyspace))
+                                             .getTable(Metadata.quote(cfName));
+        if (tableMetadata == null)
         {
             throw new RuntimeException("No table metadata found for " + keyspace + "." + cfName);
         }
-        int numberOfPartitionKeys = 0;
-        for (Row row : rows)
-            if (row.getString(2).equals("partition_key"))
-                numberOfPartitionKeys++;
-        String[] partitionKeyArray = new String[numberOfPartitionKeys];
-        for (Row row : rows)
+        //Here we assume that tableMetadata.getPartitionKey() always
+        //returns the list of columns in order of component_index
+        for (ColumnMetadata partitionKey : tableMetadata.getPartitionKey())
         {
-            String type = row.getString(2);
-            String column = row.getString(0);
-            if (type.equals("partition_key"))
-            {
-                int componentIndex = row.isNull(1) ? 0 : row.getInt(1);
-                partitionKeyArray[componentIndex] = column;
-            }
+            partitionKeys.add(partitionKey.getName());
         }
-        partitionKeys.addAll(Arrays.asList(partitionKeyArray));
     }
 
     private String quote(String identifier)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
index dbbeb47..1d8436b 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlRecordWriter.java
@@ -21,37 +21,39 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.schema.LegacySchemaTables;
-import org.apache.cassandra.db.SystemKeyspace;
+
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.TokenRange;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.CompositeType;
 import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.TypeParser;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.SyntaxException;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyOutputFormat;
-import org.apache.cassandra.hadoop.AbstractColumnFamilyRecordWriter;
-import org.apache.cassandra.hadoop.ConfigHelper;
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.utils.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.hadoop.util.Progressable;
-import org.apache.thrift.TException;
-import org.apache.thrift.transport.TTransport;
 
 /**
- * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * The <code>CqlRecordWriter</code> maps the output &lt;key, value&gt;
+ * pairs to a Cassandra table. In particular, it applies the binded variables
  * in the value to the prepared statement, which it associates with the key, and in 
  * turn the responsible endpoint.
  *
@@ -63,21 +65,38 @@ import org.apache.thrift.transport.TTransport;
  *
  * @see CqlOutputFormat
  */
-class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements AutoCloseable
+class CqlRecordWriter extends RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> implements
+        org.apache.hadoop.mapred.RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>>, AutoCloseable
 {
     private static final Logger logger = LoggerFactory.getLogger(CqlRecordWriter.class);
 
+    // The configuration this writer is associated with.
+    protected final Configuration conf;
+    // The number of mutations to buffer per endpoint
+    protected final int queueSize;
+
+    protected final long batchThreshold;
+
+    protected Progressable progressable;
+    protected TaskAttemptContext context;
+
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the mutations.
+    private final NativeRingCache ringCache;
+
     // handles for clients for each range running in the threadpool
     protected final Map<InetAddress, RangeClient> clients;
 
     // host to prepared statement id mappings
-    protected final ConcurrentHashMap<Cassandra.Client, Integer> preparedStatements = new ConcurrentHashMap<Cassandra.Client, Integer>();
+    protected final ConcurrentHashMap<Session, PreparedStatement> preparedStatements = new ConcurrentHashMap<Session, PreparedStatement>();
 
     protected final String cql;
 
-    protected AbstractType<?> keyValidator;
-    protected String [] partitionKeyColumns;
-    protected List<String> clusterColumns;
+    protected List<ColumnMetadata> partitionKeyColumns;
+    protected List<ColumnMetadata> clusterColumns;
 
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -100,28 +119,28 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
 
     CqlRecordWriter(Configuration conf)
     {
-        super(conf);
+        this.conf = conf;
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
         this.clients = new HashMap<>();
 
         try
         {
-            Cassandra.Client client = ConfigHelper.getClientFromOutputAddressList(conf);
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            Session client = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+            ringCache = new NativeRingCache(conf);
             if (client != null)
             {
-                client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-                String user = ConfigHelper.getOutputKeyspaceUserName(conf);
-                String password = ConfigHelper.getOutputKeyspacePassword(conf);
-                if ((user != null) && (password != null))
-                    AbstractColumnFamilyOutputFormat.login(user, password, client);
-                retrievePartitionKeyValidator(client);
+                TableMetadata tableMetadata = client.getCluster().getMetadata().getKeyspace(client.getLoggedKeyspace()).getTable(ConfigHelper.getOutputColumnFamily(conf));
+                clusterColumns = tableMetadata.getClusteringColumns();
+                partitionKeyColumns = tableMetadata.getPartitionKey();
+
                 String cqlQuery = CqlConfigHelper.getOutputCql(conf).trim();
                 if (cqlQuery.toLowerCase().startsWith("insert"))
                     throw new UnsupportedOperationException("INSERT with CqlRecordWriter is not supported, please use UPDATE/DELETE statement");
                 cql = appendKeyWhereClauses(cqlQuery);
 
-                TTransport transport = client.getOutputProtocol().getTransport();
-                if (transport.isOpen())
-                    transport.close();
+                client.close();
             }
             else
             {
@@ -133,7 +152,26 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
             throw new RuntimeException(e);
         }
     }
-    
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
     @Override
     public void close() throws IOException
     {
@@ -157,7 +195,7 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
     
     /**
      * If the key is to be associated with a valid value, a mutation is created
-     * for it with the given column family and columns. In the event the value
+     * for it with the given table and columns. In the event the value
      * in the column is missing (i.e., null), then it is marked for
      * {@link Deletion}. Similarly, if the entire value for a key is missing
      * (i.e., null), then the entire key is marked for {@link Deletion}.
@@ -172,25 +210,25 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
     @Override
     public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer> values) throws IOException
     {
-        Range<Token> range = ringCache.getRange(getPartitionKey(keyColumns));
+        TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
 
         // get the client for the given range, or create a new one
-	final InetAddress address = ringCache.getEndpoint(range).get(0);
+	final InetAddress address = ringCache.getEndpoints(range).get(0);
         RangeClient client = clients.get(address);
         if (client == null)
         {
             // haven't seen keys for this range: create new client
-            client = new RangeClient(ringCache.getEndpoint(range));
+            client = new RangeClient(ringCache.getEndpoints(range));
             client.start();
             clients.put(address, client);
         }
 
         // add primary key columns to the bind variables
         List<ByteBuffer> allValues = new ArrayList<ByteBuffer>(values);
-        for (String column : partitionKeyColumns)
-            allValues.add(keyColumns.get(column));
-        for (String column : clusterColumns)
-            allValues.add(keyColumns.get(column));
+        for (ColumnMetadata column : partitionKeyColumns)
+            allValues.add(keyColumns.get(column.getName()));
+        for (ColumnMetadata column : clusterColumns)
+            allValues.add(keyColumns.get(column.getName()));
 
         client.put(allValues);
 
@@ -204,16 +242,50 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Bound variables for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<List<ByteBuffer>>
+    public class RangeClient extends Thread
     {
+        // The list of endpoints for this range
+        protected final List<InetAddress> endpoints;
+        protected Session client;
+        // A bounded queue of incoming mutations for this range
+        protected final BlockingQueue<List<ByteBuffer>> queue = new ArrayBlockingQueue<List<ByteBuffer>>(queueSize);
+
+        protected volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
+        protected volatile IOException lastException;
+
         /**
          * Constructs an {@link RangeClient} for the given endpoints.
          * @param endpoints the possible endpoints to execute the mutations on
          */
         public RangeClient(List<InetAddress> endpoints)
         {
-            super(endpoints);
-         }
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
+        }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(List<ByteBuffer> value) throws IOException
+        {
+            while (true)
+            {
+                if (lastException != null)
+                    throw lastException;
+                try
+                {
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
         
         /**
          * Loops collecting cql binded variable values from the queue and sending to Cassandra
@@ -234,156 +306,138 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
                     continue;
                 }
 
-                Iterator<InetAddress> iter = endpoints.iterator();
+                ListIterator<InetAddress> iter = endpoints.listIterator();
                 while (true)
                 {
                     // send the mutation to the last-used endpoint.  first time through, this will NPE harmlessly.
+
+                    // attempt to connect to a different endpoint
                     try
                     {
-                        int i = 0;
-                        int itemId = preparedStatement(client);
-                        while (bindVariables != null)
-                        {
-                            client.execute_prepared_cql3_query(itemId, bindVariables, ConsistencyLevel.ONE);
-                            i++;
-                            
-                            if (i >= batchThreshold)
-                                break;
-                            
-                            bindVariables = queue.poll();
-                        }
-                        
-                        break;
+                        InetAddress address = iter.next();
+                        String host = address.getHostName();
+                        client = CqlConfigHelper.getOutputCluster(host, conf).connect();
                     }
                     catch (Exception e)
                     {
+                        //If connection died due to Interrupt, just try connecting to the endpoint again.
+                        if (Thread.interrupted()) {
+                            lastException = new IOException(e);
+                            iter.previous();
+                        }
                         closeInternal();
-                        if (!iter.hasNext())
+
+                        // Most exceptions mean something unexpected went wrong to that endpoint, so
+                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
+                        if ((e instanceof AuthenticationException || e instanceof InvalidQueryException) || !iter.hasNext())
                         {
                             lastException = new IOException(e);
                             break outer;
                         }
                     }
 
-                    // attempt to connect to a different endpoint
                     try
                     {
-                        InetAddress address = iter.next();
-                        String host = address.getHostName();
-                        int port = ConfigHelper.getOutputRpcPort(conf);
-                        client = CqlOutputFormat.createAuthenticatedClient(host, port, conf);
+                        int i = 0;
+                        PreparedStatement statement = preparedStatement(client);
+                        while (bindVariables != null)
+                        {
+                            BoundStatement boundStatement = new BoundStatement(statement);
+                            for (int columnPosition = 0; columnPosition < bindVariables.size(); columnPosition++)
+                            {
+                                boundStatement.setBytesUnsafe(columnPosition, bindVariables.get(columnPosition));
+                            }
+                            client.execute(boundStatement);
+                            i++;
+                            
+                            if (i >= batchThreshold)
+                                break;
+                            bindVariables = queue.poll();
+                        }
+                        break;
                     }
                     catch (Exception e)
                     {
                         closeInternal();
-                        // TException means something unexpected went wrong to that endpoint, so
-                        // we should try again to another.  Other exceptions (auth or invalid request) are fatal.
-                        if ((!(e instanceof TException)) || !iter.hasNext())
+                        if (!iter.hasNext())
                         {
                             lastException = new IOException(e);
                             break outer;
                         }
                     }
+
                 }
             }
-
             // close all our connections once we are done.
             closeInternal();
         }
 
         /** get prepared statement id from cache, otherwise prepare it from Cassandra server*/
-        private int preparedStatement(Cassandra.Client client)
+        private PreparedStatement preparedStatement(Session client)
         {
-            Integer itemId = preparedStatements.get(client);
-            if (itemId == null)
+            PreparedStatement statement = preparedStatements.get(client);
+            if (statement == null)
             {
-                CqlPreparedResult result;
+                PreparedStatement result;
                 try
                 {
-                    result = client.prepare_cql3_query(ByteBufferUtil.bytes(cql), Compression.NONE);
+                    result = client.prepare(cql);
                 }
-                catch (TException e)
+                catch (NoHostAvailableException e)
                 {
                     throw new RuntimeException("failed to prepare cql query " + cql, e);
                 }
 
-                Integer previousId = preparedStatements.putIfAbsent(client, Integer.valueOf(result.itemId));
-                itemId = previousId == null ? result.itemId : previousId;
+                PreparedStatement previousId = preparedStatements.putIfAbsent(client, result);
+                statement = previousId == null ? result : previousId;
             }
-            return itemId;
+            return statement;
         }
-    }
 
-    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
-    {
-        ByteBuffer partitionKey;
-        if (keyValidator instanceof CompositeType)
+        public void close() throws IOException
         {
-            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.length];
-            for (int i = 0; i< keys.length; i++)
-                keys[i] = keyColumns.get(partitionKeyColumns[i]);
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
+            run = false;
+            interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
 
-            partitionKey = CompositeType.build(keys);
-        }
-        else
-        {
-            partitionKey = keyColumns.get(partitionKeyColumns[0]);
+            if (lastException != null)
+                throw lastException;
         }
-        return partitionKey;
-    }
 
-    // FIXME
-    /** retrieve the key validator from system.schema_columnfamilies table */
-    private void retrievePartitionKeyValidator(Cassandra.Client client) throws Exception
-    {
-        String keyspace = ConfigHelper.getOutputKeyspace(conf);
-        String cfName = ConfigHelper.getOutputColumnFamily(conf);
-        String query = String.format("SELECT key_validator, key_aliases, column_aliases " +
-                                     "FROM %s.%s " +
-                                     "WHERE keyspace_name = '%s' and columnfamily_name = '%s'",
-                                     SystemKeyspace.NAME,
-                                     LegacySchemaTables.COLUMNFAMILIES,
-                                     keyspace,
-                                     cfName);
-        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-
-        Column rawKeyValidator = result.rows.get(0).columns.get(0);
-        String validator = ByteBufferUtil.string(ByteBuffer.wrap(rawKeyValidator.getValue()));
-        keyValidator = parseType(validator);
-        
-        Column rawPartitionKeys = result.rows.get(0).columns.get(1);
-        String keyString = ByteBufferUtil.string(ByteBuffer.wrap(rawPartitionKeys.getValue()));
-        logger.debug("partition keys: {}", keyString);
-
-        List<String> keys = FBUtilities.fromJsonList(keyString);
-        partitionKeyColumns = new String[keys.size()];
-        int i = 0;
-        for (String key : keys)
+
+        protected void closeInternal()
         {
-            partitionKeyColumns[i] = key;
-            i++;
+            if (client != null)
+            {
+                client.close();;
+            }
         }
-
-        Column rawClusterColumns = result.rows.get(0).columns.get(2);
-        String clusterColumnString = ByteBufferUtil.string(ByteBuffer.wrap(rawClusterColumns.getValue()));
-
-        logger.debug("cluster columns: {}", clusterColumnString);
-        clusterColumns = FBUtilities.fromJsonList(clusterColumnString);
     }
 
-    private AbstractType<?> parseType(String type) throws ConfigurationException
+    private ByteBuffer getPartitionKey(Map<String, ByteBuffer> keyColumns)
     {
-        try
+        ByteBuffer partitionKey;
+        if (partitionKeyColumns.size() > 1)
         {
-            // always treat counters like longs, specifically CCT.serialize is not what we need
-            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
-                return LongType.instance;
-            return TypeParser.parse(type);
+            ByteBuffer[] keys = new ByteBuffer[partitionKeyColumns.size()];
+            for (int i = 0; i< keys.length; i++)
+                keys[i] = keyColumns.get(partitionKeyColumns.get(i).getName());
+
+            partitionKey = CompositeType.build(keys);
         }
-        catch (SyntaxException e)
+        else
         {
-            throw new ConfigurationException(e.getMessage(), e);
+            partitionKey = keyColumns.get(partitionKeyColumns.get(0).getName());
         }
+        return partitionKey;
     }
 
     /**
@@ -393,10 +447,10 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
     {
         String keyWhereClause = "";
 
-        for (String partitionKey : partitionKeyColumns)
-            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey) : (" AND " + quote(partitionKey)));
-        for (String clusterColumn : clusterColumns)
-            keyWhereClause += " AND " + quote(clusterColumn) + " = ?";
+        for (ColumnMetadata partitionKey : partitionKeyColumns)
+            keyWhereClause += String.format("%s = ?", keyWhereClause.isEmpty() ? quote(partitionKey.getName()) : (" AND " + quote(partitionKey.getName())));
+        for (ColumnMetadata clusterColumn : clusterColumns)
+            keyWhereClause += " AND " + quote(clusterColumn.getName()) + " = ?";
 
         return cqlQuery + " WHERE " + keyWhereClause;
     }
@@ -406,4 +460,60 @@ class CqlRecordWriter extends AbstractColumnFamilyRecordWriter<Map<String, ByteB
     {
         return "\"" + identifier.replaceAll("\"", "\"\"") + "\"";
     }
+
+    class NativeRingCache
+    {
+        private Map<TokenRange, Set<Host>> rangeMap;
+        private Metadata metadata;
+        private final IPartitioner partitioner;
+        private final Configuration conf;
+
+        public NativeRingCache(Configuration conf)
+        {
+            this.conf = conf;
+            this.partitioner = ConfigHelper.getOutputPartitioner(conf);
+            refreshEndpointMap();
+        }
+
+
+        private void refreshEndpointMap()
+        {
+            String keyspace = ConfigHelper.getOutputKeyspace(conf);
+            Session session = CqlConfigHelper.getOutputCluster(ConfigHelper.getOutputInitialAddress(conf), conf).connect(keyspace);
+            rangeMap = new HashMap<>();
+            metadata = session.getCluster().getMetadata();
+            Set<TokenRange> ranges = metadata.getTokenRanges();
+            for (TokenRange range : ranges)
+            {
+                rangeMap.put(range, metadata.getReplicas(keyspace, range));
+            }
+        }
+
+        public TokenRange getRange(ByteBuffer key)
+        {
+            Token t = partitioner.getToken(key);
+            com.datastax.driver.core.Token driverToken = metadata.newToken(partitioner.getTokenFactory().toString(t));
+            for (TokenRange range : rangeMap.keySet())
+            {
+                if (range.contains(driverToken))
+                {
+                    return range;
+                }
+            }
+
+            throw new RuntimeException("Invalid token information returned by describe_ring: " + rangeMap);
+        }
+
+        public List<InetAddress> getEndpoints(TokenRange range)
+        {
+            Set<Host> hostSet = rangeMap.get(range);
+            List<Host> hosts = Arrays.asList(rangeMap.get(range).toArray(new Host[rangeMap.get(range).size()]));
+            List<InetAddress> addresses = new ArrayList<>(hosts.size());
+            for (Host host: hosts)
+            {
+                addresses.add(host.getAddress());
+            }
+            return addresses;
+        }
+    }
 }


[2/4] cassandra git commit: Remove Thrift dependencies in bundled tools

Posted by al...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
index 0a64c87..1ad80b7 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
@@ -18,30 +18,46 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import org.apache.cassandra.hadoop.HadoopCompat;
-import org.apache.cassandra.db.Cell;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.auth.PasswordAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.hadoop.*;
+import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.schema.LegacySchemaTables;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Hex;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
@@ -52,7 +68,8 @@ import org.apache.thrift.protocol.TBinaryProtocol;
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, val1), (name2, val2))).
  */
-public class CassandraStorage extends AbstractCassandraStorage
+@Deprecated
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
 {
     public final static String PIG_ALLOW_DELETES = "PIG_ALLOW_DELETES";
     public final static String PIG_WIDEROW_INPUT = "PIG_WIDEROW_INPUT";
@@ -71,6 +88,28 @@ public class CassandraStorage extends AbstractCassandraStorage
 
     private boolean widerows = false;
     private int limit;
+
+    protected String DEFAULT_INPUT_FORMAT;
+    protected String DEFAULT_OUTPUT_FORMAT;
+
+    protected enum MarshallerType { COMPARATOR, DEFAULT_VALIDATOR, KEY_VALIDATOR, SUBCOMPARATOR };
+
+    protected String username;
+    protected String password;
+    protected String keyspace;
+    protected String column_family;
+    protected String loadSignature;
+    protected String storeSignature;
+
+    protected Configuration conf;
+    protected String inputFormatClass;
+    protected String outputFormatClass;
+    protected int splitSize = 64 * 1024;
+    protected String partitionerClass;
+    protected boolean usePartitionFilter = false;
+    protected String initHostAddress;
+    protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
     
     // wide row hacks
     private ByteBuffer lastKey;
@@ -104,8 +143,7 @@ public class CassandraStorage extends AbstractCassandraStorage
     /** read wide row*/
     public Tuple getNextWide() throws IOException
     {
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         ByteBuffer key = null;
         Tuple tuple = null; 
         DefaultDataBag bag = new DefaultDataBag();
@@ -128,7 +166,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                         }
                         for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         lastKey = null;
                         lastRow = null;
@@ -166,7 +204,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                             addKeyToTuple(tuple, lastKey, cfDef, parseType(cfDef.getKey_validation_class()));
                         for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
                         {
-                            bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                            bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                         }
                         tuple.append(bag);
                         lastKey = key;
@@ -183,14 +221,14 @@ public class CassandraStorage extends AbstractCassandraStorage
                 {
                     for (Map.Entry<ByteBuffer, Cell> entry : lastRow.entrySet())
                     {
-                        bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                        bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                     }
                     lastKey = null;
                     lastRow = null;
                 }
                 for (Map.Entry<ByteBuffer, Cell> entry : row.entrySet())
                 {
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
                 }
             }
         }
@@ -200,7 +238,6 @@ public class CassandraStorage extends AbstractCassandraStorage
         }
     }
 
-    @Override
     /** read next row */
     public Tuple getNext() throws IOException
     {
@@ -212,8 +249,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = reader.getCurrentKey();
             Map<ByteBuffer, Cell> cf = reader.getCurrentValue();
             assert key != null && cf != null;
@@ -240,7 +276,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                 }
                 if (hasColumn)
                 {
-                    tuple.append(columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type())));
+                    tuple.append(columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type())));
                 }
                 else if (!cql3Table)
                 {   // otherwise, we need to add an empty tuple to take its place
@@ -252,7 +288,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             for (Map.Entry<ByteBuffer, Cell> entry : cf.entrySet())
             {
                 if (!added.containsKey(entry.getKey()))
-                    bag.add(columnToTuple(entry.getValue(), cfInfo, parseType(cfDef.getComparator_type())));
+                    bag.add(columnToTuple(entry.getValue(), cfDef, parseType(cfDef.getComparator_type())));
             }
             tuple.append(bag);
             // finally, special top-level indexes if needed
@@ -260,7 +296,7 @@ public class CassandraStorage extends AbstractCassandraStorage
             {
                 for (ColumnDef cdef : getIndexes())
                 {
-                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfInfo, parseType(cfDef.getComparator_type()));
+                    Tuple throwaway = columnToTuple(cf.get(cdef.name), cfDef, parseType(cfDef.getComparator_type()));
                     tuple.append(throwaway.get(1));
                 }
             }
@@ -272,14 +308,57 @@ public class CassandraStorage extends AbstractCassandraStorage
         }
     }
 
+    /** write next row */
+    public void putNext(Tuple t) throws IOException
+    {
+        /*
+        We support two cases for output:
+        First, the original output:
+            (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
+        For supers, we only accept the original output.
+        */
+
+        if (t.size() < 1)
+        {
+            // simply nothing here, we can't even delete without a key
+            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
+            return;
+        }
+        ByteBuffer key = objToBB(t.get(0));
+        if (t.getType(1) == DataType.TUPLE)
+            writeColumnsFromTuple(key, t, 1);
+        else if (t.getType(1) == DataType.BAG)
+        {
+            if (t.size() > 2)
+                throw new IOException("No arguments allowed after bag");
+            writeColumnsFromBag(key, (DataBag) t.get(1));
+        }
+        else
+            throw new IOException("Second argument in output must be a tuple or bag");
+    }
+
     /** set hadoop cassandra connection settings */
     protected void setConnectionInformation() throws IOException
     {
-        super.setConnectionInformation();
+        StorageHelper.setConnectionInformation(conf);
+        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+            inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
         if (System.getenv(PIG_ALLOW_DELETES) != null)
             allow_deletes = Boolean.parseBoolean(System.getenv(PIG_ALLOW_DELETES));
     }
 
+    /** get the full class name */
+    protected String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+    }
+
     /** set read configuration settings */
     public void setLocation(String location, Job job) throws IOException
     {
@@ -296,11 +375,11 @@ public class CassandraStorage extends AbstractCassandraStorage
             widerows = Boolean.parseBoolean(System.getenv(PIG_WIDEROW_INPUT));
         if (System.getenv(PIG_USE_SECONDARY) != null)
             usePartitionFilter = Boolean.parseBoolean(System.getenv(PIG_USE_SECONDARY));
-        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
             {
-                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
             }
             catch (NumberFormatException e)
             {
@@ -380,12 +459,67 @@ public class CassandraStorage extends AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    /** Methods to get the column family schema from Cassandra */
+    protected void initSchema(String signature) throws IOException
+    {
+        Properties properties = UDFContext.getUDFContext().getUDFProperties(CassandraStorage.class);
+
+        // Only get the schema if we haven't already gotten it
+        if (!properties.containsKey(signature))
+        {
+            try
+            {
+                Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+                client.set_keyspace(keyspace);
+
+                if (username != null && password != null)
+                {
+                    Map<String, String> credentials = new HashMap<String, String>(2);
+                    credentials.put(PasswordAuthenticator.USERNAME_KEY, username);
+                    credentials.put(PasswordAuthenticator.PASSWORD_KEY, password);
+
+                    try
+                    {
+                        client.login(new AuthenticationRequest(credentials));
+                    }
+                    catch (AuthenticationException e)
+                    {
+                        logger.error("Authentication exception: invalid username and/or password");
+                        throw new IOException(e);
+                    }
+                }
+
+                // compose the CfDef for the columfamily
+                CfDef cfDef = getCfDef(client);
+
+                if (cfDef != null)
+                {
+                    StringBuilder sb = new StringBuilder();
+                    sb.append(cfdefToString(cfDef));
+                    properties.setProperty(signature, sb.toString());
+                }
+                else
+                    throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+                            column_family,
+                            keyspace));
+            }
+            catch (Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
     /** define the schema */
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        CfDef cfDef = getCfDef(loadSignature);
         if (cfDef.column_type.equals("Super"))
             return null;
         /*
@@ -405,7 +539,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add key
         ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
         keyFieldSchema.setName("key");
-        keyFieldSchema.setType(getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
+        keyFieldSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.KEY_VALIDATOR)));
 
         ResourceSchema bagSchema = new ResourceSchema();
         ResourceFieldSchema bagField = new ResourceFieldSchema();
@@ -419,8 +553,8 @@ public class CassandraStorage extends AbstractCassandraStorage
         ResourceFieldSchema bagvalSchema = new ResourceFieldSchema();
         bagcolSchema.setName("name");
         bagvalSchema.setName("value");
-        bagcolSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
-        bagvalSchema.setType(getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
+        bagcolSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+        bagvalSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.DEFAULT_VALIDATOR)));
         bagTupleSchema.setFields(new ResourceFieldSchema[] { bagcolSchema, bagvalSchema });
         bagTupleField.setSchema(bagTupleSchema);
         bagSchema.setFields(new ResourceFieldSchema[] { bagTupleField });
@@ -431,7 +565,7 @@ public class CassandraStorage extends AbstractCassandraStorage
         // add the key first, then the indexed columns, and finally the bag
         allSchemaFields.add(keyFieldSchema);
 
-        if (!widerows && (cfInfo.compactCqlTable || !cfInfo.cql3Table))
+        if (!widerows)
         {
             // defined validators/indexes
             for (ColumnDef cdef : cfDef.column_metadata)
@@ -445,14 +579,14 @@ public class CassandraStorage extends AbstractCassandraStorage
 
                 ResourceFieldSchema idxColSchema = new ResourceFieldSchema();
                 idxColSchema.setName("name");
-                idxColSchema.setType(getPigType(marshallers.get(MarshallerType.COMPARATOR)));
+                idxColSchema.setType(StorageHelper.getPigType(marshallers.get(MarshallerType.COMPARATOR)));
 
                 ResourceFieldSchema valSchema = new ResourceFieldSchema();
                 AbstractType validator = validators.get(cdef.name);
                 if (validator == null)
                     validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
                 valSchema.setName("value");
-                valSchema.setType(getPigType(validator));
+                valSchema.setType(StorageHelper.getPigType(validator));
 
                 innerTupleSchema.setFields(new ResourceFieldSchema[] { idxColSchema, valSchema });
                 allSchemaFields.add(innerTupleField);
@@ -472,7 +606,7 @@ public class CassandraStorage extends AbstractCassandraStorage
                 AbstractType validator = validators.get(cdef.name);
                 if (validator == null)
                     validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
-                idxSchema.setType(getPigType(validator));
+                idxSchema.setType(StorageHelper.getPigType(validator));
                 allSchemaFields.add(idxSchema);
             }
         }
@@ -485,8 +619,8 @@ public class CassandraStorage extends AbstractCassandraStorage
     public void setPartitionFilter(Expression partitionFilter) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        property.setProperty(PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, indexExpressionsToString(filterToIndexExpressions(partitionFilter)));
     }
 
     /** prepare writer */
@@ -495,33 +629,93 @@ public class CassandraStorage extends AbstractCassandraStorage
         this.writer = writer;
     }
 
-    /** write next row */
-    public void putNext(Tuple t) throws IOException
+    /** convert object to ByteBuffer */
+    protected ByteBuffer objToBB(Object o)
     {
-        /*
-        We support two cases for output:
-        First, the original output:
-            (key, (name, value), (name,value), {(name,value)}) (tuples or bag is optional)
-        For supers, we only accept the original output.
-        */
+        if (o == null)
+            return nullToBB();
+        if (o instanceof java.lang.String)
+            return ByteBuffer.wrap(new DataByteArray((String)o).get());
+        if (o instanceof Integer)
+            return Int32Type.instance.decompose((Integer)o);
+        if (o instanceof Long)
+            return LongType.instance.decompose((Long)o);
+        if (o instanceof Float)
+            return FloatType.instance.decompose((Float)o);
+        if (o instanceof Double)
+            return DoubleType.instance.decompose((Double)o);
+        if (o instanceof UUID)
+            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
+            {
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
 
-        if (t.size() < 1)
+            }
+            return objToCompositeBB(objects);
+        }
+
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
         {
-            // simply nothing here, we can't even delete without a key
-            logger.warn("Empty output skipped, filter empty tuples to suppress this warning");
-            return;
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
         }
-        ByteBuffer key = objToBB(t.get(0));
-        if (t.getType(1) == DataType.TUPLE)
-            writeColumnsFromTuple(key, t, 1);
-        else if (t.getType(1) == DataType.BAG)
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+        for(Object sub : objects)
         {
-            if (t.size() > 2)
-                throw new IOException("No arguments allowed after bag");
-            writeColumnsFromBag(key, (DataBag) t.get(1));
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
         }
-        else
-            throw new IOException("Second argument in output must be a tuple or bag");
+        // NOTE: using protocol v1 serialization format for collections so as to not break
+        // compatibility. Not sure if that's the right thing.
+        return CollectionSerializer.pack(serialized, objects.size(), 1);
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
     }
 
     /** write tuple data to cassandra */
@@ -643,6 +837,19 @@ public class CassandraStorage extends AbstractCassandraStorage
         }
     }
 
+    /** get a list of columns with defined index*/
+    protected List<ColumnDef> getIndexes() throws IOException
+    {
+        CfDef cfdef = getCfDef(loadSignature);
+        List<ColumnDef> indexes = new ArrayList<ColumnDef>();
+        for (ColumnDef cdef : cfdef.column_metadata)
+        {
+            if (cdef.index_type != null)
+                indexes.add(cdef);
+        }
+        return indexes;
+    }
+
     /** get a list of Cassandra IndexExpression from Pig expression */
     private List<IndexExpression> filterToIndexExpressions(Expression expression) throws IOException
     {
@@ -713,13 +920,64 @@ public class CassandraStorage extends AbstractCassandraStorage
         return indexClause.getExpressions();
     }
 
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+
+    /** StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        this.storeSignature = signature;
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    /** output format */
+    public OutputFormat getOutputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+
+    @Override
+    public InputFormat getInputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
     /** get a list of index expression */
     private List<IndexExpression> getIndexExpressions() throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        if (property.getProperty(PARTITION_FILTER_SIGNATURE) != null)
-            return indexExpressionsFromString(property.getProperty(PARTITION_FILTER_SIGNATURE));
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        if (property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE) != null)
+            return indexExpressionsFromString(property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE));
         else
             return null;
     }
@@ -731,6 +989,129 @@ public class CassandraStorage extends AbstractCassandraStorage
         return getColumnMeta(client, true, true);
     }
 
+
+    /** get column meta data */
+    protected List<ColumnDef> getColumnMeta(Cassandra.Client client, boolean cassandraStorage, boolean includeCompactValueColumn)
+            throws org.apache.cassandra.thrift.InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            CharacterCodingException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            NotFoundException
+    {
+        String query = String.format("SELECT column_name, validator, index_type, type " +
+                        "FROM %s.%s " +
+                        "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                SystemKeyspace.NAME,
+                LegacySchemaTables.COLUMNS,
+                keyspace,
+                column_family);
+
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+        List<CqlRow> rows = result.rows;
+        List<ColumnDef> columnDefs = new ArrayList<ColumnDef>();
+        if (rows == null || rows.isEmpty())
+        {
+            // if CassandraStorage, just return the empty list
+            if (cassandraStorage)
+                return columnDefs;
+
+            // otherwise for CqlNativeStorage, check metadata for classic thrift tables
+            CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
+            for (ColumnDefinition def : cfm.regularAndStaticColumns())
+            {
+                ColumnDef cDef = new ColumnDef();
+                String columnName = def.name.toString();
+                String type = def.type.toString();
+                logger.debug("name: {}, type: {} ", columnName, type);
+                cDef.name = ByteBufferUtil.bytes(columnName);
+                cDef.validation_class = type;
+                columnDefs.add(cDef);
+            }
+            // we may not need to include the value column for compact tables as we
+            // could have already processed it as schema_columnfamilies.value_alias
+            if (columnDefs.size() == 0 && includeCompactValueColumn && cfm.compactValueColumn() != null)
+            {
+                ColumnDefinition def = cfm.compactValueColumn();
+                if ("value".equals(def.name.toString()))
+                {
+                    ColumnDef cDef = new ColumnDef();
+                    cDef.name = def.name.bytes;
+                    cDef.validation_class = def.type.toString();
+                    columnDefs.add(cDef);
+                }
+            }
+            return columnDefs;
+        }
+
+        Iterator<CqlRow> iterator = rows.iterator();
+        while (iterator.hasNext())
+        {
+            CqlRow row = iterator.next();
+            ColumnDef cDef = new ColumnDef();
+            String type = ByteBufferUtil.string(row.getColumns().get(3).value);
+            if (!type.equals("regular"))
+                continue;
+            cDef.setName(ByteBufferUtil.clone(row.getColumns().get(0).value));
+            cDef.validation_class = ByteBufferUtil.string(row.getColumns().get(1).value);
+            ByteBuffer indexType = row.getColumns().get(2).value;
+            if (indexType != null)
+                cDef.index_type = getIndexType(ByteBufferUtil.string(indexType));
+            columnDefs.add(cDef);
+        }
+        return columnDefs;
+    }
+
+
+    /** get CFMetaData of a column family */
+    protected CFMetaData getCFMetaData(String ks, String cf, Cassandra.Client client)
+            throws NotFoundException,
+            org.apache.cassandra.thrift.InvalidRequestException,
+            TException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException
+    {
+        KsDef ksDef = client.describe_keyspace(ks);
+        for (CfDef cfDef : ksDef.cf_defs)
+        {
+            if (cfDef.name.equalsIgnoreCase(cf))
+                return ThriftConversion.fromThrift(cfDef);
+        }
+        return null;
+    }
+
+    /** get index type from string */
+    protected IndexType getIndexType(String type)
+    {
+        type = type.toLowerCase();
+        if ("keys".equals(type))
+            return IndexType.KEYS;
+        else if("custom".equals(type))
+            return IndexType.CUSTOM;
+        else if("composites".equals(type))
+            return IndexType.COMPOSITES;
+        else
+            return null;
+    }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job) throws IOException
+    {
+        if (!usePartitionFilter)
+            return null;
+        List<ColumnDef> indexes = getIndexes();
+        String[] partitionKeys = new String[indexes.size()];
+        for (int i = 0; i < indexes.size(); i++)
+        {
+            partitionKeys[i] = new String(indexes.get(i).getName());
+        }
+        return partitionKeys;
+    }
+
     /** convert key to a tuple */
     private Tuple keyToTuple(ByteBuffer key, CfDef cfDef, AbstractType comparator) throws IOException
     {
@@ -744,15 +1125,26 @@ public class CassandraStorage extends AbstractCassandraStorage
     {
         if( comparator instanceof AbstractCompositeType )
         {
-            setTupleValue(tuple, 0, composeComposite((AbstractCompositeType)comparator,key));
+            StorageHelper.setTupleValue(tuple, 0, composeComposite((AbstractCompositeType) comparator, key));
         }
         else
         {
-            setTupleValue(tuple, 0, cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key));
+            StorageHelper.setTupleValue(tuple, 0, StorageHelper.cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.KEY_VALIDATOR), key, nativeProtocolVersion));
         }
 
     }
 
+    /** Deconstructs a composite type to a Tuple. */
+    protected Tuple composeComposite(AbstractCompositeType comparator, ByteBuffer name) throws IOException
+    {
+        List<AbstractCompositeType.CompositeComponent> result = comparator.deconstruct(name);
+        Tuple t = TupleFactory.getInstance().newTuple(result.size());
+        for (int i=0; i<result.size(); i++)
+            StorageHelper.setTupleValue(t, i, StorageHelper.cassandraToObj(result.get(i).comparator, result.get(i).value, nativeProtocolVersion));
+
+        return t;
+    }
+
     /** cassandra://[username:password@]<keyspace>/<columnfamily>[?slice_start=<start>&slice_end=<end>
      * [&reversed=true][&limit=1][&allow_deletes=true][&widerows=true]
      * [&use_secondary=true][&comparator=<comparator>][&partitioner=<partitioner>]]*/
@@ -817,10 +1209,206 @@ public class CassandraStorage extends AbstractCassandraStorage
                     "[&init_address=<host>][&rpc_port=<port>]]': " + e.getMessage());
         }
     }
-    
+
+
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>(params.length);
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+        }
+        return map;
+    }
+
     public ByteBuffer nullToBB()
     {
         return null;
     }
-}
 
+    /** return the CfInfo for the column family */
+    protected CfDef getCfDef(Cassandra.Client client)
+            throws org.apache.cassandra.thrift.InvalidRequestException,
+            UnavailableException,
+            TimedOutException,
+            SchemaDisagreementException,
+            TException,
+            NotFoundException,
+            org.apache.cassandra.exceptions.InvalidRequestException,
+            ConfigurationException,
+            IOException
+    {
+        // get CF meta data
+        String query = String.format("SELECT type, comparator, subcomparator, default_validator, key_validator " +
+                        "FROM %s.%s " +
+                        "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'",
+                SystemKeyspace.NAME,
+                LegacySchemaTables.COLUMNFAMILIES,
+                keyspace,
+                column_family);
+
+        CqlResult result = client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+        if (result == null || result.rows == null || result.rows.isEmpty())
+            return null;
+
+        Iterator<CqlRow> iteraRow = result.rows.iterator();
+        CfDef cfDef = new CfDef();
+        cfDef.keyspace = keyspace;
+        cfDef.name = column_family;
+        if (iteraRow.hasNext())
+        {
+            CqlRow cqlRow = iteraRow.next();
+
+            cfDef.column_type = ByteBufferUtil.string(cqlRow.columns.get(0).value);
+            cfDef.comparator_type = ByteBufferUtil.string(cqlRow.columns.get(1).value);
+            ByteBuffer subComparator = cqlRow.columns.get(2).value;
+            if (subComparator != null)
+                cfDef.subcomparator_type = ByteBufferUtil.string(subComparator);
+            cfDef.default_validation_class = ByteBufferUtil.string(cqlRow.columns.get(3).value);
+            cfDef.key_validation_class = ByteBufferUtil.string(cqlRow.columns.get(4).value);
+        }
+        cfDef.column_metadata = getColumnMetadata(client);
+        return cfDef;
+    }
+
+    /** get the columnfamily definition for the signature */
+    protected CfDef getCfDef(String signature) throws IOException
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CassandraStorage.class);
+        String prop = property.getProperty(signature);
+        return cfdefFromString(prop);
+    }
+
+    /** convert string back to CfDef */
+    protected static CfDef cfdefFromString(String st) throws IOException
+    {
+        assert st != null;
+        TDeserializer deserializer = new TDeserializer(new TBinaryProtocol.Factory());
+        CfDef cfDef = new CfDef();
+        try
+        {
+            deserializer.deserialize(cfDef, Hex.hexToBytes(st));
+        }
+        catch (TException e)
+        {
+            throw new IOException(e);
+        }
+        return cfDef;
+    }
+
+    /** convert CfDef to string */
+    protected static String cfdefToString(CfDef cfDef) throws IOException
+    {
+        assert cfDef != null;
+        // this is so awful it's kind of cool!
+        TSerializer serializer = new TSerializer(new TBinaryProtocol.Factory());
+        try
+        {
+            return Hex.bytesToHex(serializer.serialize(cfDef));
+        }
+        catch (TException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /** parse the string to a cassandra data type */
+    protected AbstractType parseType(String type) throws IOException
+    {
+        try
+        {
+            // always treat counters like longs, specifically CCT.compose is not what we need
+            if (type != null && type.equals("org.apache.cassandra.db.marshal.CounterColumnType"))
+                return LongType.instance;
+            return TypeParser.parse(type);
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+        catch (SyntaxException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    /** convert a column to a tuple */
+    protected Tuple columnToTuple(Cell col, CfDef cfDef, AbstractType comparator) throws IOException
+    {
+        Tuple pair = TupleFactory.getInstance().newTuple(2);
+
+        ByteBuffer colName = col.name().toByteBuffer();
+
+        // name
+        if(comparator instanceof AbstractCompositeType)
+            StorageHelper.setTupleValue(pair, 0, composeComposite((AbstractCompositeType) comparator, colName));
+        else
+            StorageHelper.setTupleValue(pair, 0, StorageHelper.cassandraToObj(comparator, colName, nativeProtocolVersion));
+
+        // value
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        if (validators.get(colName) == null)
+        {
+            Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
+            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(marshallers.get(MarshallerType.DEFAULT_VALIDATOR), col.value(), nativeProtocolVersion));
+        }
+        else
+            StorageHelper.setTupleValue(pair, 1, StorageHelper.cassandraToObj(validators.get(colName), col.value(), nativeProtocolVersion));
+        return pair;
+    }
+
+    /** construct a map to store the mashaller type to cassandra data type mapping */
+    protected Map<MarshallerType, AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException
+    {
+        Map<MarshallerType, AbstractType> marshallers = new EnumMap<MarshallerType, AbstractType>(MarshallerType.class);
+        AbstractType comparator;
+        AbstractType subcomparator;
+        AbstractType default_validator;
+        AbstractType key_validator;
+
+        comparator = parseType(cfDef.getComparator_type());
+        subcomparator = parseType(cfDef.getSubcomparator_type());
+        default_validator = parseType(cfDef.getDefault_validation_class());
+        key_validator = parseType(cfDef.getKey_validation_class());
+
+        marshallers.put(MarshallerType.COMPARATOR, comparator);
+        marshallers.put(MarshallerType.DEFAULT_VALIDATOR, default_validator);
+        marshallers.put(MarshallerType.KEY_VALIDATOR, key_validator);
+        marshallers.put(MarshallerType.SUBCOMPARATOR, subcomparator);
+        return marshallers;
+    }
+
+    /** get the validators */
+    protected Map<ByteBuffer, AbstractType> getValidatorMap(CfDef cfDef) throws IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<ByteBuffer, AbstractType>();
+        for (ColumnDef cd : cfDef.getColumn_metadata())
+        {
+            if (cd.getValidation_class() != null && !cd.getValidation_class().isEmpty())
+            {
+                AbstractType validator = null;
+                try
+                {
+                    validator = TypeParser.parse(cd.getValidation_class());
+                    if (validator instanceof CounterColumnType)
+                        validator = LongType.instance;
+                    validators.put(cd.name, validator);
+                }
+                catch (ConfigurationException e)
+                {
+                    throw new IOException(e);
+                }
+                catch (SyntaxException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
index 7887085..91cdd02 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -17,48 +17,78 @@
  */
 package org.apache.cassandra.hadoop.pig;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
 import java.nio.ByteBuffer;
-import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import org.apache.cassandra.db.BufferCell;
 import org.apache.cassandra.db.Cell;
 import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.exceptions.AuthenticationException;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.SyntaxException;
 import org.apache.cassandra.hadoop.ConfigHelper;
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.hadoop.cql3.CqlRecordReader;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.serializers.CollectionSerializer;
 import org.apache.cassandra.utils.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.pig.Expression;
-import org.apache.pig.ResourceSchema;
+import org.apache.pig.*;
 import org.apache.pig.Expression.OpType;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
 import org.apache.pig.impl.util.UDFContext;
-import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.external.biz.base64Coder.Base64Coder;
 
-import com.datastax.driver.core.Row;
 
-public class CqlNativeStorage extends AbstractCassandraStorage
+public class CqlNativeStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata
 {
+    protected String DEFAULT_INPUT_FORMAT;
+    protected String DEFAULT_OUTPUT_FORMAT;
+
+    protected String username;
+    protected String password;
+    protected String keyspace;
+    protected String column_family;
+    protected String loadSignature;
+    protected String storeSignature;
+
+    protected Configuration conf;
+    protected String inputFormatClass;
+    protected String outputFormatClass;
+    protected int splitSize = 64 * 1024;
+    protected String partitionerClass;
+    protected boolean usePartitionFilter = false;
+    protected String initHostAddress;
+    protected String rpcPort;
+    protected int nativeProtocolVersion = 1;
+
     private static final Logger logger = LoggerFactory.getLogger(CqlNativeStorage.class);
     private int pageSize = 1000;
     private String columns;
     private String outputQuery;
     private String whereClause;
-    private boolean hasCompactValueAlias = false;
 
     private RecordReader<Long, Row> reader;
     private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
@@ -119,21 +149,20 @@ public class CqlNativeStorage extends AbstractCassandraStorage
             if (!reader.nextKeyValue())
                 return null;
 
-            CfInfo cfInfo = getCfInfo(loadSignature);
-            CfDef cfDef = cfInfo.cfDef;
+            TableInfo tableMetadata = getCfInfo(loadSignature);
             Row row = reader.getCurrentValue();
-            Tuple tuple = TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
-            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+            Tuple tuple = TupleFactory.getInstance().newTuple(tableMetadata.getColumns().size());
+            Iterator<ColumnInfo> itera = tableMetadata.getColumns().iterator();
             int i = 0;
             while (itera.hasNext())
             {
-                ColumnDef cdef = itera.next();
-                ByteBuffer columnValue = row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+                ColumnInfo cdef = itera.next();
+                ByteBuffer columnValue = row.getBytesUnsafe(cdef.getName());
                 if (columnValue != null)
                 {
-                    Cell cell = new BufferCell(CellNames.simpleDense(cdef.name), columnValue);
-                    AbstractType<?> validator = getValidatorMap(cfDef).get(cdef.name);
-                    setTupleValue(tuple, i, cqlColumnToObj(cell, cfDef), validator);
+                    Cell cell = new BufferCell(CellNames.simpleDense(ByteBufferUtil.bytes(cdef.getName())), columnValue);
+                    AbstractType<?> validator = getValidatorMap(tableMetadata).get(ByteBufferUtil.bytes(cdef.getName()));
+                    setTupleValue(tuple, i, cqlColumnToObj(cell, tableMetadata), validator);
                 }
                 else
                     tuple.set(i, null);
@@ -148,15 +177,12 @@ public class CqlNativeStorage extends AbstractCassandraStorage
     }
 
     /** convert a cql column to an object */
-    private Object cqlColumnToObj(Cell col, CfDef cfDef) throws IOException
+    private Object cqlColumnToObj(Cell col, TableInfo cfDef) throws IOException
     {
         // standard
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         ByteBuffer cellName = col.name().toByteBuffer();
-        if (validators.get(cellName) == null)
-            return cassandraToObj(getDefaultMarshallers(cfDef).get(MarshallerType.DEFAULT_VALIDATOR), col.value());
-        else
-            return cassandraToObj(validators.get(cellName), col.value());
+        return StorageHelper.cassandraToObj(validators.get(cellName), col.value(), nativeProtocolVersion);
     }
 
     /** set the value to the position of the tuple */
@@ -165,7 +191,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         if (validator instanceof CollectionType)
             setCollectionTupleValues(tuple, position, value, validator);
         else
-           setTupleValue(tuple, position, value);
+           StorageHelper.setTupleValue(tuple, position, value);
     }
 
     /** set the values of set/list at and after the position of the tuple */
@@ -220,173 +246,33 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         return obj;
     }
 
-    /** include key columns */
-    protected List<ColumnDef> getColumnMetadata(Cassandra.Client client)
-            throws InvalidRequestException,
-            UnavailableException,
-            TimedOutException,
-            SchemaDisagreementException,
-            TException,
-            CharacterCodingException,
-            org.apache.cassandra.exceptions.InvalidRequestException,
-            ConfigurationException,
-            NotFoundException
-    {
-        List<ColumnDef> keyColumns = null;
-        // get key columns
+    /** get the columnfamily definition for the signature */
+    protected TableInfo getCfInfo(String signature) throws IOException
+    {
+        UDFContext context = UDFContext.getUDFContext();
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        TableInfo cfInfo;
         try
         {
-            keyColumns = getKeysMeta(client);
+            cfInfo = cfdefFromString(property.getProperty(signature));
         }
-        catch(Exception e)
+        catch (ClassNotFoundException e)
         {
-            logger.error("Error in retrieving key columns" , e);
+            throw new IOException(e);
         }
-
-        // get other columns
-        List<ColumnDef> columns = getColumnMeta(client, false, !hasCompactValueAlias);
-
-        // combine all columns in a list
-        if (keyColumns != null && columns != null)
-            keyColumns.addAll(columns);
-
-        return keyColumns;
+        return cfInfo;
     }
 
-    /** get keys meta data */
-    private List<ColumnDef> getKeysMeta(Cassandra.Client client)
-            throws Exception
+    /** return the CfInfo for the column family */
+    protected TableMetadata getCfInfo(Session client)
+            throws NoHostAvailableException,
+            AuthenticationException,
+            IllegalStateException
     {
-        String query = "SELECT key_aliases, " +
-                "       column_aliases, " +
-                "       key_validator, " +
-                "       comparator, " +
-                "       keyspace_name, " +
-                "       value_alias, " +
-                "       default_validator " +
-                "FROM system.schema_columnfamilies " +
-                "WHERE keyspace_name = '%s'" +
-                "  AND columnfamily_name = '%s' ";
-
-        CqlResult result = client.execute_cql3_query(
-                ByteBufferUtil.bytes(String.format(query, keyspace, column_family)),
-                Compression.NONE,
-                ConsistencyLevel.ONE);
-
-        if (result == null || result.rows == null || result.rows.isEmpty())
-            return null;
-
-        Iterator<CqlRow> iteraRow = result.rows.iterator();
-        List<ColumnDef> keys = new ArrayList<ColumnDef>();
-        if (iteraRow.hasNext())
-        {
-            CqlRow cqlRow = iteraRow.next();
-            String name = ByteBufferUtil.string(cqlRow.columns.get(4).value);
-            logger.debug("Found ksDef name: {}", name);
-            String keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(0).getValue()));
-
-            logger.debug("partition keys: {}", keyString);
-            List<String> keyNames = FBUtilities.fromJsonList(keyString);
-
-            Iterator<String> iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-            // classic thrift tables
-            if (keys.size() == 0)
-            {
-                CFMetaData cfm = getCFMetaData(keyspace, column_family, client);
-                for (ColumnDefinition def : cfm.partitionKeyColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-                for (ColumnDefinition def : cfm.clusteringColumns())
-                {
-                    String key = def.name.toString();
-                    logger.debug("name: {} ", key);
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = ByteBufferUtil.bytes(key);
-                    keys.add(cDef);
-                }
-            }
-
-            keyString = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(1).getValue()));
-
-            logger.debug("cluster keys: {}", keyString);
-            keyNames = FBUtilities.fromJsonList(keyString);
-
-            iterator = keyNames.iterator();
-            while (iterator.hasNext())
-            {
-                ColumnDef cDef = new ColumnDef();
-                cDef.name = ByteBufferUtil.bytes(iterator.next());
-                keys.add(cDef);
-            }
-
-            String validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(2).getValue()));
-            logger.debug("row key validator: {}", validator);
-            AbstractType<?> keyValidator = parseType(validator);
-
-            Iterator<ColumnDef> keyItera = keys.iterator();
-            if (keyValidator instanceof CompositeType)
-            {
-                Iterator<AbstractType<?>> typeItera = ((CompositeType) keyValidator).types.iterator();
-                while (typeItera.hasNext())
-                    keyItera.next().validation_class = typeItera.next().toString();
-            }
-            else
-                keyItera.next().validation_class = keyValidator.toString();
-
-            validator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(3).getValue()));
-            logger.debug("cluster key validator: {}", validator);
-
-            if (keyItera.hasNext() && validator != null && !validator.isEmpty())
-            {
-                AbstractType<?> clusterKeyValidator = parseType(validator);
-
-                if (clusterKeyValidator instanceof CompositeType)
-                {
-                    Iterator<AbstractType<?>> typeItera = ((CompositeType) clusterKeyValidator).types.iterator();
-                    while (keyItera.hasNext())
-                        keyItera.next().validation_class = typeItera.next().toString();
-                }
-                else
-                    keyItera.next().validation_class = clusterKeyValidator.toString();
-            }
-
-            // compact value_alias column
-            if (cqlRow.columns.get(5).value != null)
-            {
-                try
-                {
-                    String compactValidator = ByteBufferUtil.string(ByteBuffer.wrap(cqlRow.columns.get(6).getValue()));
-                    logger.debug("default validator: {}", compactValidator);
-                    AbstractType<?> defaultValidator = parseType(compactValidator);
-
-                    ColumnDef cDef = new ColumnDef();
-                    cDef.name = cqlRow.columns.get(5).value;
-                    cDef.validation_class = defaultValidator.toString();
-                    keys.add(cDef);
-                    hasCompactValueAlias = true;
-                }
-                catch (Exception e)
-                {
-                    // no compact column at value_alias
-                }
-            }
-
-        }
-        return keys;
+        // get CF meta data
+        return client.getCluster().getMetadata().getKeyspace(Metadata.quote(keyspace)).getTable(Metadata.quote(column_family));
     }
 
-
     /** output: (((name, value), (name, value)), (value ... value), (value...value)) */
     public void putNext(Tuple t) throws IOException
     {
@@ -441,6 +327,91 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         return keys;
     }
 
+    /** convert object to ByteBuffer */
+    protected ByteBuffer objToBB(Object o)
+    {
+        if (o == null)
+            return nullToBB();
+        if (o instanceof java.lang.String)
+            return ByteBuffer.wrap(new DataByteArray((String)o).get());
+        if (o instanceof Integer)
+            return Int32Type.instance.decompose((Integer)o);
+        if (o instanceof Long)
+            return LongType.instance.decompose((Long)o);
+        if (o instanceof Float)
+            return FloatType.instance.decompose((Float)o);
+        if (o instanceof Double)
+            return DoubleType.instance.decompose((Double)o);
+        if (o instanceof UUID)
+            return ByteBuffer.wrap(UUIDGen.decompose((UUID) o));
+        if(o instanceof Tuple) {
+            List<Object> objects = ((Tuple)o).getAll();
+            //collections
+            if (objects.size() > 0 && objects.get(0) instanceof String)
+            {
+                String collectionType = (String) objects.get(0);
+                if ("set".equalsIgnoreCase(collectionType) ||
+                        "list".equalsIgnoreCase(collectionType))
+                    return objToListOrSetBB(objects.subList(1, objects.size()));
+                else if ("map".equalsIgnoreCase(collectionType))
+                    return objToMapBB(objects.subList(1, objects.size()));
+
+            }
+            return objToCompositeBB(objects);
+        }
+
+        return ByteBuffer.wrap(((DataByteArray) o).get());
+    }
+
+    private ByteBuffer objToListOrSetBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+        }
+        return CollectionSerializer.pack(serialized, objects.size(), 3);
+    }
+
+    private ByteBuffer objToMapBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size() * 2);
+        for(Object sub : objects)
+        {
+            List<Object> keyValue = ((Tuple)sub).getAll();
+            for (Object entry: keyValue)
+            {
+                ByteBuffer buffer = objToBB(entry);
+                serialized.add(buffer);
+            }
+        }
+        return CollectionSerializer.pack(serialized, objects.size(), 3);
+    }
+
+    private ByteBuffer objToCompositeBB(List<Object> objects)
+    {
+        List<ByteBuffer> serialized = new ArrayList<ByteBuffer>(objects.size());
+        int totalLength = 0;
+        for(Object sub : objects)
+        {
+            ByteBuffer buffer = objToBB(sub);
+            serialized.add(buffer);
+            totalLength += 2 + buffer.remaining() + 1;
+        }
+        ByteBuffer out = ByteBuffer.allocate(totalLength);
+        for (ByteBuffer bb : serialized)
+        {
+            int length = bb.remaining();
+            out.put((byte) ((length >> 8) & 0xFF));
+            out.put((byte) (length & 0xFF));
+            out.put(bb);
+            out.put((byte) 0);
+        }
+        out.flip();
+        return out;
+    }
+
     /** send CQL query request using data from tuple */
     private void cqlQueryFromTuple(Map<String, ByteBuffer> key, Tuple t, int offset) throws IOException
     {
@@ -487,30 +458,50 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         }
     }
 
+    /** get the validators */
+    protected Map<ByteBuffer, AbstractType> getValidatorMap(TableInfo cfDef) throws IOException
+    {
+        Map<ByteBuffer, AbstractType> validators = new HashMap<>();
+        for (ColumnInfo cd : cfDef.getColumns())
+        {
+            if (cd.getTypeName() != null)
+            {
+                try
+                {
+                    AbstractType validator = TypeParser.parseCqlName(cd.getTypeName());
+                    if (validator instanceof CounterColumnType)
+                        validator = LongType.instance;
+                    validators.put(ByteBufferUtil.bytes(cd.getName()), validator);
+                }
+                catch (ConfigurationException | SyntaxException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+        }
+        return validators;
+    }
+
     /** schema: (value, value, value) where keys are in the front. */
     public ResourceSchema getSchema(String location, Job job) throws IOException
     {
         setLocation(location, job);
-        CfInfo cfInfo = getCfInfo(loadSignature);
-        CfDef cfDef = cfInfo.cfDef;
+        TableInfo cfInfo = getCfInfo(loadSignature);
         // top-level schema, no type
         ResourceSchema schema = new ResourceSchema();
 
-        // get default marshallers and validators
-        Map<MarshallerType, AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfDef);
+        // get default validators
+        Map<ByteBuffer, AbstractType> validators = getValidatorMap(cfInfo);
 
         // will contain all fields for this schema
         List<ResourceFieldSchema> allSchemaFields = new ArrayList<ResourceFieldSchema>();
 
-        for (ColumnDef cdef : cfDef.column_metadata)
+        for (ColumnInfo cdef : cfInfo.getColumns())
         {
             ResourceFieldSchema valSchema = new ResourceFieldSchema();
-            AbstractType validator = validators.get(cdef.name);
-            if (validator == null)
-                validator = marshallers.get(MarshallerType.DEFAULT_VALIDATOR);
+            AbstractType validator = validators.get(cdef.getName());
             valSchema.setName(new String(cdef.getName()));
-            valSchema.setType(getPigType(validator));
+            valSchema.setType(StorageHelper.getPigType(validator));
             allSchemaFields.add(valSchema);
         }
 
@@ -522,8 +513,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
     public void setPartitionFilter(Expression partitionFilter) throws IOException
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        property.setProperty(PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        property.setProperty(StorageHelper.PARTITION_FILTER_SIGNATURE, partitionFilterToWhereClauseString(partitionFilter));
     }
 
     /**
@@ -557,8 +548,8 @@ public class CqlNativeStorage extends AbstractCassandraStorage
     private String getWhereClauseForPartitionFilter()
     {
         UDFContext context = UDFContext.getUDFContext();
-        Properties property = context.getUDFProperties(AbstractCassandraStorage.class);
-        return property.getProperty(PARTITION_FILTER_SIGNATURE);
+        Properties property = context.getUDFProperties(CqlNativeStorage.class);
+        return property.getProperty(StorageHelper.PARTITION_FILTER_SIGNATURE);
     }
 
     /** set read configuration settings */
@@ -631,7 +622,7 @@ public class CqlNativeStorage extends AbstractCassandraStorage
             CqlConfigHelper.setInputWhereClauses(conf, whereClause);
 
         String whereClauseForPartitionFilter = getWhereClauseForPartitionFilter();
-        String wc = whereClause != null && !whereClause.trim().isEmpty() 
+        String wc = whereClause != null && !whereClause.trim().isEmpty()
                                ? whereClauseForPartitionFilter == null ? whereClause: String.format("%s AND %s", whereClause.trim(), whereClauseForPartitionFilter)
                                : whereClauseForPartitionFilter;
 
@@ -639,17 +630,17 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         {
             logger.debug("where clause: {}", wc);
             CqlConfigHelper.setInputWhereClauses(conf, wc);
-        } 
-        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        }
+        if (System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE) != null)
         {
             try
             {
-                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+                ConfigHelper.setInputSplitSize(conf, Integer.parseInt(System.getenv(StorageHelper.PIG_INPUT_SPLIT_SIZE)));
             }
             catch (NumberFormatException e)
             {
                 throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", e);
-            }           
+            }
         }
 
         if (ConfigHelper.getInputInitialAddress(conf) == null)
@@ -700,6 +691,74 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         initSchema(storeSignature);
     }
 
+    /** Methods to get the column family schema from Cassandra */
+    protected void initSchema(String signature) throws IOException
+    {
+        Properties properties = UDFContext.getUDFContext().getUDFProperties(CqlNativeStorage.class);
+
+        // Only get the schema if we haven't already gotten it
+        if (!properties.containsKey(signature))
+        {
+            try
+            {
+                Session client = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf), conf).connect();
+                client.execute("USE " + keyspace);
+
+                // compose the CfDef for the columfamily
+                TableMetadata cfInfo = getCfInfo(client);
+
+                if (cfInfo != null)
+                {
+                    properties.setProperty(signature, cfdefToString(cfInfo));
+                }
+                else
+                    throw new IOException(String.format("Table '%s' not found in keyspace '%s'",
+                            column_family,
+                            keyspace));
+            }
+            catch (Exception e)
+            {
+                throw new IOException(e);
+            }
+        }
+    }
+
+
+    /** convert CfDef to string */
+    protected static String cfdefToString(TableMetadata cfDef) throws IOException
+    {
+        TableInfo tableInfo = new TableInfo(cfDef);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream( baos );
+        oos.writeObject( tableInfo );
+        oos.close();
+        return new String( Base64Coder.encode(baos.toByteArray()) );
+    }
+
+    /** convert string back to CfDef */
+    protected static TableInfo cfdefFromString(String st) throws IOException, ClassNotFoundException
+    {
+        byte [] data = Base64Coder.decode( st );
+        ObjectInputStream ois = new ObjectInputStream(
+                new ByteArrayInputStream(  data ) );
+        Object o  = ois.readObject();
+        ois.close();
+        return (TableInfo)o;
+    }
+
+    /** decompose the query to store the parameters in a map */
+    public static Map<String, String> getQueryMap(String query) throws UnsupportedEncodingException
+    {
+        String[] params = query.split("&");
+        Map<String, String> map = new HashMap<String, String>(params.length);
+        for (String param : params)
+        {
+            String[] keyValue = param.split("=");
+            map.put(keyValue[0], URLDecoder.decode(keyValue[1], "UTF-8"));
+        }
+        return map;
+    }
+
     private void setLocationFromUri(String location) throws IOException
     {
         try
@@ -808,11 +867,171 @@ public class CqlNativeStorage extends AbstractCassandraStorage
         }
     }
 
-    /**
-     * Thrift API can't handle null, so use empty byte array
-     */
     public ByteBuffer nullToBB()
     {
         return ByteBuffer.wrap(new byte[0]);
     }
+
+    /** output format */
+    public OutputFormat getOutputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(outputFormatClass, "outputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    public void cleanupOnFailure(String failure, Job job)
+    {
+    }
+
+    public void cleanupOnSuccess(String location, Job job) throws IOException {
+    }
+
+    /** return partition keys */
+    public String[] getPartitionKeys(String location, Job job) throws IOException
+    {
+        if (!usePartitionFilter)
+            return null;
+        TableInfo tableMetadata = getCfInfo(loadSignature);
+        String[] partitionKeys = new String[tableMetadata.getPartitionKey().size()];
+        for (int i = 0; i < tableMetadata.getPartitionKey().size(); i++)
+        {
+            partitionKeys[i] = new String(tableMetadata.getPartitionKey().get(i).getName());
+        }
+        return partitionKeys;
+    }
+
+    public void checkSchema(ResourceSchema schema) throws IOException
+    {
+        // we don't care about types, they all get casted to ByteBuffers
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    @Override
+    public InputFormat getInputFormat() throws IOException
+    {
+        try
+        {
+            return FBUtilities.construct(inputFormatClass, "inputformat");
+        }
+        catch (ConfigurationException e)
+        {
+            throw new IOException(e);
+        }
+    }
+
+    public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException
+    {
+        return relativeToAbsolutePath(location, curDir);
+    }
+
+    @Override
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException
+    {
+        return location;
+    }
+
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
+    /** StoreFunc methods */
+    public void setStoreFuncUDFContextSignature(String signature)
+    {
+        this.storeSignature = signature;
+    }
+
+    /** set hadoop cassandra connection settings */
+    protected void setConnectionInformation() throws IOException
+    {
+        StorageHelper.setConnectionInformation(conf);
+        if (System.getenv(StorageHelper.PIG_INPUT_FORMAT) != null)
+            inputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_INPUT_FORMAT));
+        else
+            inputFormatClass = DEFAULT_INPUT_FORMAT;
+        if (System.getenv(StorageHelper.PIG_OUTPUT_FORMAT) != null)
+            outputFormatClass = getFullyQualifiedClassName(System.getenv(StorageHelper.PIG_OUTPUT_FORMAT));
+        else
+            outputFormatClass = DEFAULT_OUTPUT_FORMAT;
+    }
+
+    /** get the full class name */
+    protected String getFullyQualifiedClassName(String classname)
+    {
+        return classname.contains(".") ? classname : "org.apache.cassandra.hadoop." + classname;
+    }
+}
+
+class TableInfo implements Serializable
+{
+    private final List<ColumnInfo> columns;
+    private final List<ColumnInfo> partitionKey;
+    private final String name;
+
+    public TableInfo(TableMetadata tableMetadata)
+    {
+        List<ColumnMetadata> cmColumns = tableMetadata.getColumns();
+        columns = new ArrayList<>(cmColumns.size());
+        for (ColumnMetadata cm : cmColumns)
+        {
+            columns.add(new ColumnInfo(this, cm));
+        }
+        List<ColumnMetadata> cmPartitionKey = tableMetadata.getPartitionKey();
+        partitionKey = new ArrayList<>(cmPartitionKey.size());
+        for (ColumnMetadata cm : cmPartitionKey)
+        {
+            partitionKey.add(new ColumnInfo(this, cm));
+        }
+        name = tableMetadata.getName();
+    }
+
+    public List<ColumnInfo> getPartitionKey()
+    {
+        return partitionKey;
+    }
+
+    public List<ColumnInfo> getColumns()
+    {
+        return columns;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
 }
+
+class ColumnInfo implements Serializable
+{
+    private final TableInfo table;
+    private final String name;
+    private final String typeName;
+
+    public ColumnInfo(TableInfo tableInfo, ColumnMetadata columnMetadata)
+    {
+        table = tableInfo;
+        name = columnMetadata.getName();
+        typeName = columnMetadata.getType().toString();
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public String getTypeName()
+    {
+        return typeName;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
new file mode 100644
index 0000000..66836b2
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/StorageHelper.java
@@ -0,0 +1,121 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.Date;
+import java.util.UUID;
+
+import org.apache.cassandra.db.marshal.*;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.serializers.CollectionSerializer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class StorageHelper
+{
+    // system environment variables that can be set to configure connection info:
+    // alternatively, Hadoop JobConf variables can be set using keys from ConfigHelper
+    public final static String PIG_INPUT_RPC_PORT = "PIG_INPUT_RPC_PORT";
+    public final static String PIG_INPUT_INITIAL_ADDRESS = "PIG_INPUT_INITIAL_ADDRESS";
+    public final static String PIG_INPUT_PARTITIONER = "PIG_INPUT_PARTITIONER";
+    public final static String PIG_OUTPUT_RPC_PORT = "PIG_OUTPUT_RPC_PORT";
+    public final static String PIG_OUTPUT_INITIAL_ADDRESS = "PIG_OUTPUT_INITIAL_ADDRESS";
+    public final static String PIG_OUTPUT_PARTITIONER = "PIG_OUTPUT_PARTITIONER";
+    public final static String PIG_RPC_PORT = "PIG_RPC_PORT";
+    public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
+    public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
+    public final static String PIG_INPUT_FORMAT = "PIG_INPUT_FORMAT";
+    public final static String PIG_OUTPUT_FORMAT = "PIG_OUTPUT_FORMAT";
+    public final static String PIG_INPUT_SPLIT_SIZE = "PIG_INPUT_SPLIT_SIZE";
+
+
+    public final static String PARTITION_FILTER_SIGNATURE = "cassandra.partition.filter";
+
+    protected static void setConnectionInformation(Configuration conf)
+    {
+        if (System.getenv(PIG_RPC_PORT) != null)
+        {
+            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_RPC_PORT));
+        }
+
+        if (System.getenv(PIG_INPUT_RPC_PORT) != null)
+            ConfigHelper.setInputRpcPort(conf, System.getenv(PIG_INPUT_RPC_PORT));
+        if (System.getenv(PIG_OUTPUT_RPC_PORT) != null)
+            ConfigHelper.setOutputRpcPort(conf, System.getenv(PIG_OUTPUT_RPC_PORT));
+
+        if (System.getenv(PIG_INITIAL_ADDRESS) != null)
+        {
+            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_INITIAL_ADDRESS));
+        }
+        if (System.getenv(PIG_INPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setInputInitialAddress(conf, System.getenv(PIG_INPUT_INITIAL_ADDRESS));
+        if (System.getenv(PIG_OUTPUT_INITIAL_ADDRESS) != null)
+            ConfigHelper.setOutputInitialAddress(conf, System.getenv(PIG_OUTPUT_INITIAL_ADDRESS));
+
+        if (System.getenv(PIG_PARTITIONER) != null)
+        {
+            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_PARTITIONER));
+        }
+        if(System.getenv(PIG_INPUT_PARTITIONER) != null)
+            ConfigHelper.setInputPartitioner(conf, System.getenv(PIG_INPUT_PARTITIONER));
+        if(System.getenv(PIG_OUTPUT_PARTITIONER) != null)
+            ConfigHelper.setOutputPartitioner(conf, System.getenv(PIG_OUTPUT_PARTITIONER));
+    }
+
+    protected static Object cassandraToObj(AbstractType validator, ByteBuffer value, int nativeProtocolVersion)
+    {
+        if (validator instanceof DecimalType || validator instanceof InetAddressType)
+            return validator.getString(value);
+
+        if (validator instanceof CollectionType)
+        {
+            // For CollectionType, the compose() method assumes the v3 protocol format of collection, which
+            // is not correct here since we query using the CQL-over-thrift interface which use the pre-v3 format
+            return ((CollectionSerializer)validator.getSerializer()).deserializeForNativeProtocol(value, nativeProtocolVersion);
+        }
+
+        return validator.compose(value);
+    }
+
+    /** set the value to the position of the tuple */
+    protected static void setTupleValue(Tuple pair, int position, Object value) throws ExecException
+    {
+        if (value instanceof BigInteger)
+            pair.set(position, ((BigInteger) value).intValue());
+        else if (value instanceof ByteBuffer)
+            pair.set(position, new DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+        else if (value instanceof UUID)
+            pair.set(position, new DataByteArray(UUIDGen.decompose((java.util.UUID) value)));
+        else if (value instanceof Date)
+            pair.set(position, TimestampType.instance.decompose((Date) value).getLong());
+        else
+            pair.set(position, value);
+    }
+
+    /** get pig type for the cassandra data type*/
+    protected static byte getPigType(AbstractType type)
+    {
+        if (type instanceof LongType || type instanceof DateType || type instanceof TimestampType) // DateType is bad and it should feel bad
+            return DataType.LONG;
+        else if (type instanceof IntegerType || type instanceof Int32Type) // IntegerType will overflow at 2**31, but is kept for compatibility until pig has a BigInteger
+            return DataType.INTEGER;
+        else if (type instanceof AsciiType || type instanceof UTF8Type || type instanceof DecimalType || type instanceof InetAddressType)
+            return DataType.CHARARRAY;
+        else if (type instanceof FloatType)
+            return DataType.FLOAT;
+        else if (type instanceof DoubleType)
+            return DataType.DOUBLE;
+        else if (type instanceof AbstractCompositeType || type instanceof CollectionType)
+            return DataType.TUPLE;
+
+        return DataType.BYTEARRAY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
index 06d83dd..6991958 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
@@ -92,7 +92,7 @@ public class SSTableLoader implements StreamEventHandler
                     return false;
                 }
 
-                CFMetaData metadata = client.getCFMetaData(keyspace, desc.cfname);
+                CFMetaData metadata = client.getTableMetadata(desc.cfname);
                 if (metadata == null)
                 {
                     outputHandler.output(String.format("Skipping file %s: table %s.%s doesn't exist", name, keyspace, desc.cfname));
@@ -251,7 +251,9 @@ public class SSTableLoader implements StreamEventHandler
         /**
          * Stop the client.
          */
-        public void stop() {}
+        public void stop()
+        {
+        }
 
         /**
          * Provides connection factory.
@@ -268,7 +270,12 @@ public class SSTableLoader implements StreamEventHandler
          * Validate that {@code keyspace} is an existing keyspace and {@code
          * cfName} one of its existing column family.
          */
-        public abstract CFMetaData getCFMetaData(String keyspace, String cfName);
+        public abstract CFMetaData getTableMetadata(String tableName);
+
+        public void setTableMetadata(CFMetaData cfm)
+        {
+            throw new RuntimeException();
+        }
 
         public Map<InetAddress, Collection<Range<Token>>> getEndpointToRangesMap()
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index d6ce46e..c17d2d7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4117,8 +4117,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
         SSTableLoader.Client client = new SSTableLoader.Client()
         {
+            private String keyspace;
+
             public void init(String keyspace)
             {
+                this.keyspace = keyspace;
                 try
                 {
                     setPartitioner(DatabaseDescriptor.getPartitioner());
@@ -4135,14 +4138,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 }
             }
 
-            public CFMetaData getCFMetaData(String keyspace, String cfName)
+            public CFMetaData getTableMetadata(String tableName)
             {
-                return Schema.instance.getCFMetaData(keyspace, cfName);
+                return Schema.instance.getCFMetaData(keyspace, tableName);
             }
         };
 
-        SSTableLoader loader = new SSTableLoader(dir, client, new OutputHandler.LogOutput());
-        return loader.stream();
+        return new SSTableLoader(dir, client, new OutputHandler.LogOutput()).stream();
     }
 
     public void rescheduleFailedDeletions()