You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/05/05 22:57:01 UTC

[4/4] cassandra git commit: Remove Thrift dependencies in bundled tools

Remove Thrift dependencies in bundled tools

patch by Philip Thompson; reviewed by Aleksey Yeschenko for
CASSANDRA-8358


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f698cc22
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f698cc22
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f698cc22

Branch: refs/heads/trunk
Commit: f698cc228452e847e3ad46bd8178549cf8171767
Parents: 5b61545
Author: Philip Thompson <pt...@gmail.com>
Authored: Tue May 5 21:38:23 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Tue May 5 23:57:39 2015 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   3 +
 build.xml                                       |   8 +-
 lib/cassandra-driver-core-2.1.2.jar             | Bin 638544 -> 0 bytes
 lib/cassandra-driver-core-2.1.5-shaded.jar      | Bin 0 -> 1994984 bytes
 .../apache/cassandra/db/marshal/TypeParser.java |  71 +-
 .../hadoop/AbstractBulkOutputFormat.java        |  73 --
 .../hadoop/AbstractBulkRecordWriter.java        | 239 -------
 .../hadoop/AbstractColumnFamilyInputFormat.java | 245 +++----
 .../AbstractColumnFamilyOutputFormat.java       | 164 -----
 .../AbstractColumnFamilyRecordWriter.java       | 193 -----
 .../cassandra/hadoop/BulkOutputFormat.java      |  51 +-
 .../cassandra/hadoop/BulkRecordWriter.java      | 145 +++-
 .../hadoop/ColumnFamilyInputFormat.java         |  47 +-
 .../hadoop/ColumnFamilyOutputFormat.java        | 119 +++-
 .../hadoop/ColumnFamilyRecordReader.java        |   1 +
 .../hadoop/ColumnFamilyRecordWriter.java        | 124 +++-
 .../hadoop/cql3/CqlBulkOutputFormat.java        |  81 ++-
 .../hadoop/cql3/CqlBulkRecordWriter.java        | 223 ++++--
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |  52 +-
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |  76 +-
 .../cassandra/hadoop/cql3/CqlRecordReader.java  |  93 ++-
 .../cassandra/hadoop/cql3/CqlRecordWriter.java  | 392 ++++++----
 .../cassandra/hadoop/pig/CassandraStorage.java  | 706 +++++++++++++++++--
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 629 +++++++++++------
 .../cassandra/hadoop/pig/StorageHelper.java     | 121 ++++
 .../cassandra/io/sstable/SSTableLoader.java     |  13 +-
 .../cassandra/service/StorageService.java       |  10 +-
 .../org/apache/cassandra/tools/BulkLoader.java  | 209 ++----
 .../utils/NativeSSTableLoaderClient.java        | 126 ++++
 .../org/apache/cassandra/pig/CqlTableTest.java  |  81 +--
 .../io/sstable/CQLSSTableWriterTest.java        |  14 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   7 +-
 33 files changed, 2678 insertions(+), 1639 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8f89ece..ab92aa0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Remove Thrift dependencies in bundled tools (CASSANDRA-8358)
  * Disable memory mapping of hsperfdata file for JVM statistics (CASSANDRA-9242)
  * Add pre-startup checks to detect potential incompatibilities (CASSANDRA-8049)
  * Distinguish between null and unset in protocol v4 (CASSANDRA-7304)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 03008de..32351a1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -75,6 +75,9 @@ New features
 Upgrading
 ---------
    - Pig's CqlStorage has been removed, use CqlNativeStorage instead
+   - Pig's CassandraStorage has been deprecated. CassandraStorage
+     should only be used against tables created via thrift.
+     Use CqlNativeStorage for all other tables.
    - IAuthenticator been updated to remove responsibility for user/role
      maintenance and is now solely responsible for validating credentials,
      This is primarily done via SASL, though an optional method exists for

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ba99cd9..a5f195f 100644
--- a/build.xml
+++ b/build.xml
@@ -381,7 +381,7 @@
           <dependency groupId="io.netty" artifactId="netty-all" version="4.0.23.Final" />
           <dependency groupId="com.google.code.findbugs" artifactId="jsr305" version="2.0.2" />
           <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" />
-          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" />
+          <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.5" classifier="shaded" />
           <dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" />
           <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.4" />
           <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" />
@@ -433,7 +433,7 @@
         <dependency groupId="org.apache.pig" artifactId="pig"/>
       	<dependency groupId="com.google.code.findbugs" artifactId="jsr305"/>
         <dependency groupId="org.antlr" artifactId="antlr"/>
-        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/>
+        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded"/>
         <dependency groupId="org.javassist" artifactId="javassist"/>
         <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/>
         <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/>
@@ -501,12 +501,12 @@
 
         <dependency groupId="org.apache.thrift" artifactId="libthrift"/>
         <dependency groupId="org.apache.cassandra" artifactId="cassandra-thrift"/>
-        
+
         <!-- don't need hadoop classes to run, but if you use the hadoop stuff -->
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" optional="true"/>
         <dependency groupId="org.apache.hadoop" artifactId="hadoop-minicluster" optional="true"/>
         <dependency groupId="org.apache.pig" artifactId="pig" optional="true"/>
-      	<dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" optional="true"/>
+        <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" classifier="shaded" optional="true"/>
 
 
         <!-- don't need jna to run, but nice to have -->

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.2.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.2.jar b/lib/cassandra-driver-core-2.1.2.jar
deleted file mode 100644
index 2095c05..0000000
Binary files a/lib/cassandra-driver-core-2.1.2.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/lib/cassandra-driver-core-2.1.5-shaded.jar
----------------------------------------------------------------------
diff --git a/lib/cassandra-driver-core-2.1.5-shaded.jar b/lib/cassandra-driver-core-2.1.5-shaded.jar
new file mode 100644
index 0000000..bb83fb5
Binary files /dev/null and b/lib/cassandra-driver-core-2.1.5-shaded.jar differ

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/db/marshal/TypeParser.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/marshal/TypeParser.java b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
index ad7ffed..faa678e 100644
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@ -21,13 +21,9 @@ import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
+import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
@@ -42,7 +38,7 @@ public class TypeParser
     private int idx;
 
     // A cache of parsed string, specially useful for DynamicCompositeType
-    private static final Map<String, AbstractType<?>> cache = new HashMap<String, AbstractType<?>>();
+    private static final Map<String, AbstractType<?>> cache = new HashMap<>();
 
     public static final TypeParser EMPTY_PARSER = new TypeParser("", 0);
 
@@ -98,9 +94,48 @@ public class TypeParser
         return parse(compareWith == null ? null : compareWith.toString());
     }
 
-    public static String getShortName(AbstractType<?> type)
+    public static String parseCqlNativeType(String str)
     {
-        return type.getClass().getSimpleName();
+        return CQL3Type.Native.valueOf(str.trim().toUpperCase(Locale.ENGLISH)).getType().toString();
+    }
+
+    public static String parseCqlCollectionOrFrozenType(String str) throws SyntaxException
+    {
+        str = str.trim().toLowerCase();
+        switch (str)
+        {
+            case "map": return "MapType";
+            case "set": return "SetType";
+            case "list": return "ListType";
+            case "frozen": return "FrozenType";
+            default: throw new SyntaxException("Invalid type name" + str);
+        }
+    }
+
+    /**
+     * Turns user facing type names into Abstract Types, 'text' -> UTF8Type
+     */
+    public static AbstractType<?> parseCqlName(String str) throws SyntaxException, ConfigurationException
+    {
+        return parse(parseCqlNameRecurse(str));
+    }
+
+    private static String parseCqlNameRecurse(String str) throws SyntaxException
+    {
+        if (str.indexOf(',') >= 0 && (!str.contains("<") || (str.indexOf(',') < str.indexOf('<'))))
+        {
+            String[] parseString = str.split(",", 2);
+            return parseCqlNameRecurse(parseString[0]) + "," + parseCqlNameRecurse(parseString[1]);
+        }
+        else if (str.contains("<"))
+        {
+            String[] parseString = str.trim().split("<", 2);
+            return parseCqlCollectionOrFrozenType(parseString[0]) + "(" + parseCqlNameRecurse(parseString[1].substring(0, parseString[1].length()-1)) + ")";
+        }
+        else
+        {
+            return parseCqlNativeType(str);
+        }
     }
 
     /**
@@ -126,7 +161,7 @@ public class TypeParser
         if (str.charAt(idx) != '(')
             throw new IllegalStateException();
 
-        Map<String, String> map = new HashMap<String, String>();
+        Map<String, String> map = new HashMap<>();
         ++idx; // skipping '('
 
         while (skipBlankAndComma())
@@ -157,7 +192,7 @@ public class TypeParser
 
     public List<AbstractType<?>> getTypeParameters() throws SyntaxException, ConfigurationException
     {
-        List<AbstractType<?>> list = new ArrayList<AbstractType<?>>();
+        List<AbstractType<?>> list = new ArrayList<>();
 
         if (isEOS())
             return list;
@@ -191,7 +226,7 @@ public class TypeParser
 
     public Map<Byte, AbstractType<?>> getAliasParameters() throws SyntaxException, ConfigurationException
     {
-        Map<Byte, AbstractType<?>> map = new HashMap<Byte, AbstractType<?>>();
+        Map<Byte, AbstractType<?>> map = new HashMap<>();
 
         if (isEOS())
             return map;
@@ -384,11 +419,7 @@ public class TypeParser
             Field field = typeClass.getDeclaredField("instance");
             return (AbstractType<?>) field.get(null);
         }
-        catch (NoSuchFieldException e)
-        {
-            throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
-        }
-        catch (IllegalAccessException e)
+        catch (NoSuchFieldException | IllegalAccessException e)
         {
             throw new ConfigurationException("Invalid comparator class " + typeClass.getName() + ": must define a public static instance field or a public static method getInstance(TypeParser).");
         }
@@ -489,12 +520,6 @@ public class TypeParser
         return str.substring(i, idx);
     }
 
-    public char readNextChar()
-    {
-        skipBlank();
-        return str.charAt(idx++);
-    }
-
     /**
      * Helper function to ease the writing of AbstractType.toString() methods.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
deleted file mode 100644
index c0e91da..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-
-public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
-    implements org.apache.hadoop.mapred.OutputFormat<K, V>
-{
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
-        }
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
deleted file mode 100644
index 5ba0a96..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.*;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.io.sstable.SSTableLoader;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progressable;
-
-public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
-implements org.apache.hadoop.mapred.RecordWriter<K, V>
-{
-    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
-    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
-    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
-    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-    
-    private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
-    
-    protected final Configuration conf;
-    protected final int maxFailures;
-    protected final int bufferSize; 
-    protected Closeable writer;
-    protected SSTableLoader loader;
-    protected Progressable progress;
-    protected TaskAttemptContext context;
-    
-    protected AbstractBulkRecordWriter(TaskAttemptContext context)
-    {
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
-    }
-
-    protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
-    {
-        this(conf);
-        this.progress = progress;
-    }
-
-    protected AbstractBulkRecordWriter(Configuration conf)
-    {
-        Config.setOutboundBindAny(true);
-        this.conf = conf;
-        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
-        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
-    }
-
-    protected String getOutputLocation() throws IOException
-    {
-        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
-        if (dir == null)
-            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
-        return dir;
-    }
-
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        if (writer != null)
-        {
-            writer.close();
-            Future<StreamState> future = loader.stream();
-            while (true)
-            {
-                try
-                {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                    break;
-                }
-                catch (ExecutionException | TimeoutException te)
-                {
-                    if (null != progress)
-                        progress.progress();
-                    if (null != context)
-                        HadoopCompat.progress(context);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-            if (loader.getFailedHosts().size() > 0)
-            {
-                if (loader.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
-                else
-                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
-            }
-        }
-    }
-
-    public static class ExternalClient extends SSTableLoader.Client
-    {
-        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
-        private final Configuration conf;
-        private final String hostlist;
-        private final int rpcPort;
-        private final String username;
-        private final String password;
-
-        public ExternalClient(Configuration conf)
-        {
-          super();
-          this.conf = conf;
-          this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
-          this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
-          this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
-          this.password = ConfigHelper.getOutputKeyspacePassword(conf);
-        }
-
-        public void init(String keyspace)
-        {
-            String[] nodes = hostlist.split(",");
-            Set<InetAddress> hosts = new HashSet<InetAddress>(nodes.length);
-            for (String node : nodes)
-            {
-                try
-                {
-                    hosts.add(InetAddress.getByName(node));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
-
-                    // log in
-                    client.set_keyspace(keyspace);
-                    if (username != null)
-                    {
-                        Map<String, String> creds = new HashMap<String, String>();
-                        creds.put(PasswordAuthenticator.USERNAME_KEY, username);
-                        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
-                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-                        client.login(authRequest);
-                    }
-
-                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                    List<KsDef> ksDefs = client.describe_keyspaces();
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : tokenRanges)
-                    {
-                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    for (KsDef ksDef : ksDefs)
-                    {
-                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
-                        for (CfDef cfDef : ksDef.cf_defs)
-                            cfs.put(cfDef.name, ThriftConversion.fromThrift(cfDef));
-                        knownCfs.put(ksDef.name, cfs);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
-                }
-            }
-        }
-
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
-        } 
-    }
-
-    public static class NullOutputHandler implements OutputHandler
-    {
-        public void output(String msg) {}
-        public void debug(String msg) {}
-        public void warn(String msg) {}
-        public void warn(String msg, Throwable th) {}
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
index 691bd76..2ef4cf4 100644
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -18,31 +18,27 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.*;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.auth.PasswordAuthenticator;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TokenRange;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.hadoop.cql3.*;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TApplicationException;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-
 
 public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
 {
@@ -51,7 +47,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     public static final String MAPRED_TASK_ID = "mapred.task.id";
     // The simple fact that we need this is because the old Hadoop API wants us to "write"
     // to the key and value whereas the new asks for it.
-    // I choose 8kb as the default max key size (instanciated only once), but you can
+    // I choose 8kb as the default max key size (instantiated only once), but you can
     // override it in your jobConf with this setting.
     public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
     public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
@@ -59,6 +55,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
     private String keyspace;
     private String cfName;
     private IPartitioner partitioner;
+    private Session session;
 
     protected void validateConfiguration(Configuration conf)
     {
@@ -72,57 +69,27 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
             throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
     }
 
-    public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF input format");
-        TTransport transport;
-        try
-        {
-            transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
-        }
-        catch (Exception e)
-        {
-            throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
-        }
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-
-        // log in
-        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
-        if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
-        {
-            Map<String, String> creds = new HashMap<String, String>();
-            creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
-            creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
-            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-            client.login(authRequest);
-        }
-        logger.debug("Authenticated client for CF input format created successfully");
-        return client;
-    }
-
     public List<InputSplit> getSplits(JobContext context) throws IOException
     {
-        Configuration conf = HadoopCompat.getConfiguration(context);;
+        Configuration conf = HadoopCompat.getConfiguration(context);
 
         validateConfiguration(conf);
 
-        // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
         keyspace = ConfigHelper.getInputKeyspace(conf);
         cfName = ConfigHelper.getInputColumnFamily(conf);
         partitioner = ConfigHelper.getInputPartitioner(conf);
         logger.debug("partitioner is {}", partitioner);
 
+        // canonical ranges and nodes holding replicas
+        Map<TokenRange, Set<Host>> masterRangeNodes = getRangeMap(conf, keyspace);
 
-        // cannonical ranges, split into pieces, fetching the splits in parallel
+        // canonical ranges, split into pieces, fetching the splits in parallel
         ExecutorService executor = new ThreadPoolExecutor(0, 128, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
-        List<InputSplit> splits = new ArrayList<InputSplit>();
+        List<InputSplit> splits = new ArrayList<>();
 
         try
         {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+            List<Future<List<InputSplit>>> splitfutures = new ArrayList<>();
             KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
             Range<Token> jobRange = null;
             if (jobKeyRange != null)
@@ -130,7 +97,7 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                 if (jobKeyRange.start_key != null)
                 {
                     if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
+                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving partitioner");
                     if (jobKeyRange.start_token != null)
                         throw new IllegalArgumentException("only start_key supported");
                     if (jobKeyRange.end_token != null)
@@ -149,26 +116,25 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
                 }
             }
 
-            for (TokenRange range : masterRangeNodes)
+            session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+            Metadata metadata = session.getCluster().getMetadata();
+
+            for (TokenRange range : masterRangeNodes.keySet())
             {
                 if (jobRange == null)
                 {
-                    // for each range, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                    // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                    splitfutures.add(executor.submit(new SplitCallable(range, masterRangeNodes.get(range), conf)));
                 }
                 else
                 {
-                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
-                                                             partitioner.getTokenFactory().fromString(range.end_token));
-
-                    if (dhtRange.intersects(jobRange))
+                    TokenRange jobTokenRange = rangeToTokenRange(metadata, jobRange);
+                    if (range.intersects(jobTokenRange))
                     {
-                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
+                        for (TokenRange intersection: range.intersectWith(jobTokenRange))
                         {
-                            range.start_token = partitioner.getTokenFactory().toString(intersection.left);
-                            range.end_token = partitioner.getTokenFactory().toString(intersection.right);
-                            // for each range, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                            // for each tokenRange, pick a live owner and ask it to compute bite-sized splits
+                            splitfutures.add(executor.submit(new SplitCallable(intersection,  masterRangeNodes.get(range), conf)));
                         }
                     }
                 }
@@ -197,53 +163,53 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         return splits;
     }
 
+    private TokenRange rangeToTokenRange(Metadata metadata, Range<Token> range)
+    {
+        return metadata.newTokenRange(metadata.newToken(partitioner.getTokenFactory().toString(range.left)),
+                metadata.newToken(partitioner.getTokenFactory().toString(range.right)));
+    }
+
     /**
-     * Gets a token range and splits it up according to the suggested
+     * Gets a token tokenRange and splits it up according to the suggested
      * size into input splits that Hadoop can use.
      */
     class SplitCallable implements Callable<List<InputSplit>>
     {
 
-        private final TokenRange range;
+        private final TokenRange tokenRange;
+        private final Set<Host> hosts;
         private final Configuration conf;
 
-        public SplitCallable(TokenRange tr, Configuration conf)
+        public SplitCallable(TokenRange tr, Set<Host> hosts, Configuration conf)
         {
-            this.range = tr;
+            this.tokenRange = tr;
+            this.hosts = hosts;
             this.conf = conf;
         }
 
         public List<InputSplit> call() throws Exception
         {
-            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
-            assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
+            ArrayList<InputSplit> splits = new ArrayList<>();
+            Map<TokenRange, Long> subSplits;
+            subSplits = getSubSplits(keyspace, cfName, tokenRange, conf);
             // turn the sub-ranges into InputSplits
-            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+            String[] endpoints = new String[hosts.size()];
+
             // hadoop needs hostname, not ip
             int endpointIndex = 0;
-            for (String endpoint: range.rpc_endpoints)
-            {
-                String endpoint_address = endpoint;
-                if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
-                    endpoint_address = range.endpoints.get(endpointIndex);
-                endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
-            }
+            for (Host endpoint : hosts)
+                endpoints[endpointIndex++] = endpoint.getAddress().getHostName();
 
-            Token.TokenFactory factory = partitioner.getTokenFactory();
-            for (CfSplit subSplit : subSplits)
+            for (TokenRange subSplit : subSplits.keySet())
             {
-                Token left = factory.fromString(subSplit.getStart_token());
-                Token right = factory.fromString(subSplit.getEnd_token());
-                Range<Token> range = new Range<Token>(left, right);
-                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
-                for (Range<Token> subrange : ranges)
+                List<TokenRange> ranges = subSplit.unwrap();
+                for (TokenRange subrange : ranges)
                 {
                     ColumnFamilySplit split =
                             new ColumnFamilySplit(
-                                    factory.toString(subrange.left),
-                                    factory.toString(subrange.right),
-                                    subSplit.getRow_count(),
+                                    subrange.getStart().toString().substring(2),
+                                    subrange.getEnd().toString().substring(2),
+                                    subSplits.get(subSplit),
                                     endpoints);
 
                     logger.debug("adding {}", split);
@@ -254,80 +220,63 @@ public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<
         }
     }
 
-    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    private Map<TokenRange, Long> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
     {
-        int splitsize = ConfigHelper.getInputSplitSize(conf);
-        for (int i = 0; i < range.rpc_endpoints.size(); i++)
+        int splitSize = ConfigHelper.getInputSplitSize(conf);
+        try
         {
-            String host = range.rpc_endpoints.get(i);
-
-            if (host == null || host.equals("0.0.0.0"))
-                host = range.endpoints.get(i);
-
-            try
-            {
-                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
-                client.set_keyspace(keyspace);
-
-                try
-                {
-                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
-                }
-                catch (TApplicationException e)
-                {
-                    // fallback to guessing split size if talking to a server without describe_splits_ex method
-                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
-                    {
-                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
-                        return tokenListToSplits(splitPoints, splitsize);
-                    }
-                    throw e;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.debug("failed connect to endpoint {}", host, e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
+            return describeSplits(keyspace, cfName, range, splitSize);
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
         }
-        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
     }
 
-    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+    private Map<TokenRange, Set<Host>> getRangeMap(Configuration conf, String keyspace)
     {
-        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
-        for (int j = 0; j < splitTokens.size() - 1; j++)
-            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
-        return splits;
+        Session session = CqlConfigHelper.getInputCluster(ConfigHelper.getInputInitialAddress(conf).split(","), conf).connect();
+
+        Map<TokenRange, Set<Host>> map = new HashMap<>();
+        Metadata metadata = session.getCluster().getMetadata();
+        for (TokenRange tokenRange : metadata.getTokenRanges())
+            map.put(tokenRange, metadata.getReplicas(keyspace, tokenRange));
+        return map;
     }
 
-    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+    private Map<TokenRange, Long> describeSplits(String keyspace, String table, TokenRange tokenRange, int splitSize)
     {
-        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
-        List<TokenRange> map;
-        try
+        String query = String.format("SELECT mean_partition_size, partitions_count " +
+                                     "FROM %s.%s " +
+                                     "WHERE keyspace_name = ? AND table_name = ? AND range_start = ? AND range_end = ?",
+                                     SystemKeyspace.NAME,
+                                     SystemKeyspace.SIZE_ESTIMATES);
+
+        ResultSet resultSet = session.execute(query, keyspace, table, tokenRange.getStart().toString(), tokenRange.getEnd().toString());
+
+        Row row = resultSet.one();
+        // If we have no data on this split, return the full split i.e., do not sub-split
+        // Assume smallest granularity of partition count available from CASSANDRA-7688
+        if (row == null)
         {
-            map = client.describe_local_ring(ConfigHelper.getInputKeyspace(conf));
+            Map<TokenRange, Long> wrappedTokenRange = new HashMap<>();
+            wrappedTokenRange.put(tokenRange, (long) 128);
+            return wrappedTokenRange;
         }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        
-        return map;
+
+        long meanPartitionSize = row.getLong("mean_partition_size");
+        long partitionCount = row.getLong("partitions_count");
+
+        int splitCount = (int)((meanPartitionSize * partitionCount) / splitSize);
+        List<TokenRange> splitRanges = tokenRange.splitEvenly(splitCount);
+        Map<TokenRange, Long> rangesWithLength = new HashMap<>();
+        for (TokenRange range : splitRanges)
+            rangesWithLength.put(range, partitionCount/splitCount);
+
+        return rangesWithLength;
     }
 
-    //
     // Old Hadoop API
-    //
     public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
     {
         TaskAttemptContext tac = HadoopCompat.newTaskAttemptContext(jobConf, new TaskAttemptID());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
deleted file mode 100644
index 03d0045..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.PasswordAuthenticator;
-import org.apache.cassandra.thrift.AuthenticationRequest;
-import org.apache.cassandra.thrift.Cassandra;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TTransport;
-
-/**
- * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
- * OutputFormat that allows reduce tasks to store keys (and corresponding
- * values) as Cassandra rows (and respective columns) in a given
- * ColumnFamily.
- *
- * <p>
- * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
- * Keyspace and ColumnFamily in your
- * Hadoop job Configuration. The {@link ConfigHelper} class, through its
- * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
- * simple.
- * </p>
- *
- * <p>
- * For the sake of performance, this class employs a lazy write-back caching
- * mechanism, where its record writer batches mutations created based on the
- * reduce's inputs (in a task-specific map), and periodically makes the changes
- * official by sending a batch mutate request to Cassandra.
- * </p>
- * @param <Y>
- */
-public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
-{
-    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
-    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
-    private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
-
-
-    /**
-     * Check for validity of the output-specification for the job.
-     *
-     * @param context
-     *            information about the job
-     */
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    protected void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
-        if (ConfigHelper.getOutputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
-        if (ConfigHelper.getOutputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
-    /**
-     * The OutputCommitter for this format does not write any data to the DFS.
-     *
-     * @param context
-     *            the task context
-     * @return an output committer
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /**
-     * Connects to the given server:port and returns a client based on the given socket that points to the configured
-     * keyspace, and is logged in with the configured credentials.
-     *
-     * @param host fully qualified host name to connect to
-     * @param port RPC port of the server
-     * @param conf a job configuration
-     * @return a cassandra client
-     * @throws Exception set of thrown exceptions may be implementation defined,
-     *                   depending on the used transport factory
-     */
-    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
-    {
-        logger.debug("Creating authenticated client for CF output format");
-        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
-        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
-        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
-        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
-        String user = ConfigHelper.getOutputKeyspaceUserName(conf);
-        String password = ConfigHelper.getOutputKeyspacePassword(conf);
-        if ((user != null) && (password != null))
-            login(user, password, client);
-
-        logger.debug("Authenticated client for CF output format created successfully");
-        return client;
-    }
-
-    public static void login(String user, String password, Cassandra.Client client) throws Exception
-    {
-        Map<String, String> creds = new HashMap<String, String>();
-        creds.put(PasswordAuthenticator.USERNAME_KEY, user);
-        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
-        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-        client.login(authRequest);
-    }
-
-    /**
-     * An {@link OutputCommitter} that does nothing.
-     */
-    private static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
deleted file mode 100644
index cb44beb..0000000
--- a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.hadoop;
-
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.util.*;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.cassandra.client.RingCache;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.transport.TTransport;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
- * pairs to a Cassandra column family. In particular, it applies all mutations
- * in the value, which it associates with the key, and in turn the responsible
- * endpoint.
- *
- * <p>
- * Furthermore, this writer groups the mutations by the endpoint responsible for
- * the rows being affected. This allows the mutations to be executed in parallel,
- * directly to a responsible endpoint.
- * </p>
- *
- * @see ColumnFamilyOutputFormat
- */
-public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y>
-{
-    // The configuration this writer is associated with.
-    protected final Configuration conf;
-
-    // The ring cache that describes the token ranges each node in the ring is
-    // responsible for. This is what allows us to group the mutations by
-    // the endpoints they should be targeted at. The targeted endpoint
-    // essentially
-    // acts as the primary replica for the rows being affected by the mutations.
-    protected final RingCache ringCache;
-
-    // The number of mutations to buffer per endpoint
-    protected final int queueSize;
-
-    protected final long batchThreshold;
-
-    protected final ConsistencyLevel consistencyLevel;
-    protected Progressable progressable;
-    protected TaskAttemptContext context;
-
-    protected AbstractColumnFamilyRecordWriter(Configuration conf)
-    {
-        this.conf = conf;
-        this.ringCache = new RingCache(conf);
-        this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
-        batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
-        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
-    }
-    
-    /**
-     * Close this <code>RecordWriter</code> to future operations, but not before
-     * flushing out the batched mutations.
-     *
-     * @param context the context of the task
-     * @throws IOException
-     */
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-    
-    protected abstract void close() throws IOException;
-
-    /**
-     * A client that runs in a threadpool and connects to the list of endpoints for a particular
-     * range. Mutations for keys in that range are sent to this client via a queue.
-     */
-    public abstract class AbstractRangeClient<K> extends Thread
-    {
-        // The list of endpoints for this range
-        protected final List<InetAddress> endpoints;
-        // A bounded queue of incoming mutations for this range
-        protected final BlockingQueue<K> queue = new ArrayBlockingQueue<K>(queueSize);
-
-        protected volatile boolean run = true;
-        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
-        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
-        // when the client is closed.
-        protected volatile IOException lastException;
-
-        protected Cassandra.Client client;
-
-        /**
-         * Constructs an {@link AbstractRangeClient} for the given endpoints.
-         * @param endpoints the possible endpoints to execute the mutations on
-         */
-        public AbstractRangeClient(List<InetAddress> endpoints)
-        {
-            super("client-" + endpoints);
-            this.endpoints = endpoints;
-         }
-
-        /**
-         * enqueues the given value to Cassandra
-         */
-        public void put(K value) throws IOException
-        {
-            while (true)
-            {
-                if (lastException != null)
-                    throw lastException;
-                try
-                {
-                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
-                        break;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new AssertionError(e);
-                }
-            }
-        }
-
-        public void close() throws IOException
-        {
-            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
-            run = false;
-            interrupt();
-            try
-            {
-                this.join();
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
-
-            if (lastException != null)
-                throw lastException;
-        }
-
-        protected void closeInternal()
-        {
-            if (client != null)
-            {
-                TTransport transport = client.getOutputProtocol().getTransport();
-                if (transport.isOpen())
-                    transport.close();
-            }
-        }
-
-        /**
-         * Loops collecting mutations from the queue and sending to Cassandra
-         */
-        public abstract void run();
-
-        @Override
-        public String toString()
-        {
-            return "#<Client for " + endpoints + ">";
-        }
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index f5a5a8d..5282279 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,9 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 
-public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+        implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
 {
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
@@ -39,4 +42,50 @@ public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<M
     {
         return new BulkRecordWriter(context);
     }
+
+
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+        }
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index d67b856..6b9ecb5 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -17,24 +17,57 @@
  */
 package org.apache.cassandra.hadoop;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
+import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.thrift.Column;
 import org.apache.cassandra.thrift.CounterColumn;
 import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.utils.NativeSSTableLoaderClient;
+import org.apache.cassandra.utils.OutputHandler;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
 
-public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+public final class BulkRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>>
+        implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
 {
+    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+
+    private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
+
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize;
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
     private File outputDir;
     
     
@@ -55,17 +88,32 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
 
     BulkRecordWriter(TaskAttemptContext context)
     {
-        super(context);
+
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
     }
 
     BulkRecordWriter(Configuration conf, Progressable progress)
     {
-        super(conf, progress);
+        this(conf);
+        this.progress = progress;
     }
 
     BulkRecordWriter(Configuration conf)
     {
-        super(conf);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+    }
+
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
     }
 
     private void setTypes(Mutation mutation)
@@ -115,6 +163,54 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
     }
 
     @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        if (writer != null)
+        {
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
+            {
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+            }
+        }
+    }
+
+    @Override
     public void write(ByteBuffer keybuff, List<Mutation> value) throws IOException
     {
         setTypes(value.get(0));
@@ -158,4 +254,43 @@ public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer,
                 HadoopCompat.progress(context);
         }
     }
+
+    public static class ExternalClient extends NativeSSTableLoaderClient
+    {
+        public ExternalClient(Configuration conf)
+        {
+            super(resolveHostAddresses(conf),
+                  CqlConfigHelper.getOutputNativePort(conf),
+                  ConfigHelper.getOutputKeyspaceUserName(conf),
+                  ConfigHelper.getOutputKeyspacePassword(conf),
+                  CqlConfigHelper.getSSLOptions(conf).orNull());
+        }
+
+        private static Collection<InetAddress> resolveHostAddresses(Configuration conf)
+        {
+            Set<InetAddress> addresses = new HashSet<>();
+
+            for (String host : ConfigHelper.getOutputInitialAddress(conf).split(","))
+            {
+                try
+                {
+                    addresses.add(InetAddress.getByName(host));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            return addresses;
+        }
+    }
+
+    public static class NullOutputHandler implements OutputHandler
+    {
+        public void output(String msg) {}
+        public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 686d486..88dd2e2 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -21,11 +21,23 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.*;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.PasswordAuthenticator;
 import org.apache.cassandra.db.Cell;
+import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -44,9 +56,40 @@ import org.apache.hadoop.mapreduce.*;
  *
  * The default split size is 64k rows.
  */
+@Deprecated
 public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Cell>>
 {
-    
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
+
+    public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF input format");
+        TTransport transport;
+        try
+        {
+            transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port);
+        }
+        catch (Exception e)
+        {
+            throw new TTransportException("Failed to open a transport to " + location + ":" + port + ".", e);
+        }
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+
+        // log in
+        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
+        if ((ConfigHelper.getInputKeyspaceUserName(conf) != null) && (ConfigHelper.getInputKeyspacePassword(conf) != null))
+        {
+            Map<String, String> creds = new HashMap<String, String>();
+            creds.put(PasswordAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+            creds.put(PasswordAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+            client.login(authRequest);
+        }
+        logger.debug("Authenticated client for CF input format created successfully");
+        return client;
+    }
+
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
index 2990bf3..94ced69 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyOutputFormat.java
@@ -18,10 +18,18 @@
 package org.apache.cassandra.hadoop;
 
 
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.*;
+
+import org.slf4j.*;
+
+import org.apache.cassandra.auth.*;
 import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.protocol.*;
+import org.apache.thrift.transport.*;
 
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
@@ -44,8 +52,93 @@ import org.apache.hadoop.mapreduce.*;
  * official by sending a batch mutate request to Cassandra.
  * </p>
  */
-public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<ByteBuffer,List<Mutation>>
+@Deprecated
+public class ColumnFamilyOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
+        implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
 {
+    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+
+    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyOutputFormat.class);
+
+    /**
+     * The OutputCommitter for this format does not write any data to the DFS.
+     *
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /**
+     * Check for validity of the output-specification for the job.
+     *
+     * @param context
+     *            information about the job
+     */
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    protected void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /**
+     * Connects to the given server:port and returns a client based on the given socket that points to the configured
+     * keyspace, and is logged in with the configured credentials.
+     *
+     * @param host fully qualified host name to connect to
+     * @param port RPC port of the server
+     * @param conf a job configuration
+     * @return a cassandra client
+     * @throws Exception set of thrown exceptions may be implementation defined,
+     *                   depending on the used transport factory
+     */
+    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF output format");
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port);
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+        String user = ConfigHelper.getOutputKeyspaceUserName(conf);
+        String password = ConfigHelper.getOutputKeyspacePassword(conf);
+        if ((user != null) && (password != null))
+            login(user, password, client);
+
+        logger.debug("Authenticated client for CF output format created successfully");
+        return client;
+    }
+
+    public static void login(String user, String password, Cassandra.Client client) throws Exception
+    {
+        Map<String, String> creds = new HashMap<String, String>();
+        creds.put(PasswordAuthenticator.USERNAME_KEY, user);
+        creds.put(PasswordAuthenticator.PASSWORD_KEY, password);
+        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+        client.login(authRequest);
+    }
+
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public ColumnFamilyRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress)
@@ -64,4 +157,26 @@ public class ColumnFamilyOutputFormat extends AbstractColumnFamilyOutputFormat<B
     {
         return new ColumnFamilyRecordWriter(context);
     }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    private static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
index 35437e9..d205f13 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordReader.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.thrift.TException;
 import org.apache.thrift.transport.TTransport;
 
+@Deprecated
 public class ColumnFamilyRecordReader extends RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
     implements org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Cell>>
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f698cc22/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
index d6a873b..31c7047 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyRecordWriter.java
@@ -22,15 +22,19 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.concurrent.*;
 
+import org.apache.cassandra.client.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.utils.*;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.*;
 import org.apache.thrift.TException;
 import org.apache.hadoop.util.Progressable;
+import org.apache.thrift.transport.*;
 
 
 /**
@@ -47,10 +51,30 @@ import org.apache.hadoop.util.Progressable;
  *
  * @see ColumnFamilyOutputFormat
  */
-final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<ByteBuffer, List<Mutation>>
+@Deprecated
+final class ColumnFamilyRecordWriter extends RecordWriter<ByteBuffer, List<Mutation>> implements
+        org.apache.hadoop.mapred.RecordWriter<ByteBuffer, List<Mutation>>
 {
+    // The configuration this writer is associated with.
+    protected final Configuration conf;
+
+    // The number of mutations to buffer per endpoint
+    protected final int queueSize;
+
+    protected final long batchThreshold;
+
+    protected final ConsistencyLevel consistencyLevel;
+    protected Progressable progressable;
+    protected TaskAttemptContext context;
     // handles for clients for each range running in the threadpool
     private final Map<Range, RangeClient> clients;
+
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the mutations.
+    private final RingCache ringCache;
     
     /**
      * Upon construction, obtain the map that this writer will use to collect
@@ -73,11 +97,33 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
 
     ColumnFamilyRecordWriter(Configuration conf)
     {
-        super(conf);
+        this.conf = conf;
+        this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+        batchThreshold = conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+        this.ringCache = new RingCache(conf);
         this.clients = new HashMap<Range, RangeClient>();
     }
-    
-    @Override
+
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
     public void close() throws IOException
     {
         // close all the clients before throwing anything
@@ -138,8 +184,20 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
      * A client that runs in a threadpool and connects to the list of endpoints for a particular
      * range. Mutations for keys in that range are sent to this client via a queue.
      */
-    public class RangeClient extends AbstractRangeClient<Pair<ByteBuffer, Mutation>>
+    public class RangeClient extends Thread
     {
+        // The list of endpoints for this range
+        protected final List<InetAddress> endpoints;
+        // A bounded queue of incoming mutations for this range
+        protected final BlockingQueue<Pair<ByteBuffer, Mutation>> queue = new ArrayBlockingQueue<>(queueSize);
+
+        protected volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
+        protected volatile IOException lastException;
+
+        protected Cassandra.Client client;
         public final String columnFamily = ConfigHelper.getOutputColumnFamily(conf);
         
         /**
@@ -148,8 +206,58 @@ final class ColumnFamilyRecordWriter extends AbstractColumnFamilyRecordWriter<By
         */
         public RangeClient(List<InetAddress> endpoints)
         {
-            super(endpoints);
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
          }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(Pair<ByteBuffer, Mutation> value) throws IOException
+        {
+            while (true)
+            {
+                if (lastException != null)
+                    throw lastException;
+                try
+                {
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+
+        public void close() throws IOException
+        {
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
+            run = false;
+            interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
+        }
+
+        protected void closeInternal()
+        {
+            if (client != null)
+            {
+                TTransport transport = client.getOutputProtocol().getTransport();
+                if (transport.isOpen())
+                    transport.close();
+            }
+        }
         
         /**
          * Loops collecting mutations from the queue and sending to Cassandra