You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2014/08/13 21:15:12 UTC

git commit: backport CASSANDRA-6927 to 2.0

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.0 efde6ae81 -> 44764c03e


backport CASSANDRA-6927 to 2.0


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/44764c03
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/44764c03
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/44764c03

Branch: refs/heads/cassandra-2.0
Commit: 44764c03e892bfdb7294ad32d9ff703909186917
Parents: efde6ae
Author: Brandon Williams <br...@apache.org>
Authored: Wed Aug 13 14:14:38 2014 -0500
Committer: Brandon Williams <br...@apache.org>
Committed: Wed Aug 13 14:14:38 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../hadoop/AbstractBulkOutputFormat.java        |  73 ++++++
 .../hadoop/AbstractBulkRecordWriter.java        | 251 ++++++++++++++++++
 .../cassandra/hadoop/BulkOutputFormat.java      |  49 +---
 .../cassandra/hadoop/BulkRecordWriter.java      | 259 ++-----------------
 .../hadoop/cql3/CqlBulkOutputFormat.java        | 106 ++++++++
 .../hadoop/cql3/CqlBulkRecordWriter.java        | 199 ++++++++++++++
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   2 +-
 .../cassandra/hadoop/cql3/CqlOutputFormat.java  |   2 +-
 .../io/sstable/AbstractSSTableSimpleWriter.java |  10 +-
 .../cassandra/io/sstable/CQLSSTableWriter.java  |   3 +-
 11 files changed, 663 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3d31948..1ac22f9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * (Hadoop) Add CqlOutputFormat (CASSANDRA-6927)
  * Don't depend on cassandra config for nodetool ring (CASSANDRA-7508)
  * (cqlsh) Fix failing cqlsh formatting tests (CASSANDRA-7703)
  * Fix MS expiring map timeout for Paxos messages (CASSANDRA-7752)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
new file mode 100644
index 0000000..c0e91da
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkOutputFormat.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+
+public abstract class AbstractBulkOutputFormat<K, V> extends OutputFormat<K, V>
+    implements org.apache.hadoop.mapred.OutputFormat<K, V>
+{
+    @Override
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(HadoopCompat.getConfiguration(context));
+    }
+
+    private void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
+        }
+    }
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    public static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
new file mode 100644
index 0000000..22255a6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractBulkRecordWriter.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.KsDef;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.cassandra.utils.OutputHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractBulkRecordWriter<K, V> extends RecordWriter<K, V>
+implements org.apache.hadoop.mapred.RecordWriter<K, V>
+{
+    public final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
+    public final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
+    public final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
+    public final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
+    
+    private final Logger logger = LoggerFactory.getLogger(AbstractBulkRecordWriter.class);
+    
+    protected final Configuration conf;
+    protected final int maxFailures;
+    protected final int bufferSize; 
+    protected Closeable writer;
+    protected SSTableLoader loader;
+    protected Progressable progress;
+    protected TaskAttemptContext context;
+    
+    protected AbstractBulkRecordWriter(TaskAttemptContext context)
+    {
+        this(HadoopCompat.getConfiguration(context));
+        this.context = context;
+    }
+
+    protected AbstractBulkRecordWriter(Configuration conf, Progressable progress)
+    {
+        this(conf);
+        this.progress = progress;
+    }
+
+    protected AbstractBulkRecordWriter(Configuration conf)
+    {
+        Config.setClientMode(true);
+        Config.setOutboundBindAny(true);
+        this.conf = conf;
+        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
+        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
+        bufferSize = Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64"));
+    }
+
+    protected String getOutputLocation() throws IOException
+    {
+        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
+        if (dir == null)
+            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
+        return dir;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+
+    private void close() throws IOException
+    {
+        if (writer != null)
+        {
+            writer.close();
+            Future<StreamState> future = loader.stream();
+            while (true)
+            {
+                try
+                {
+                    future.get(1000, TimeUnit.MILLISECONDS);
+                    break;
+                }
+                catch (ExecutionException | TimeoutException te)
+                {
+                    if (null != progress)
+                        progress.progress();
+                    if (null != context)
+                        HadoopCompat.progress(context);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new IOException(e);
+                }
+            }
+            if (loader.getFailedHosts().size() > 0)
+            {
+                if (loader.getFailedHosts().size() > maxFailures)
+                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
+                else
+                    logger.warn("Some hosts failed: {}", loader.getFailedHosts());
+            }
+        }
+    }
+
+    public static class ExternalClient extends SSTableLoader.Client
+    {
+        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
+        private final Configuration conf;
+        private final String hostlist;
+        private final int rpcPort;
+        private final String username;
+        private final String password;
+
+        public ExternalClient(Configuration conf)
+        {
+          super();
+          this.conf = conf;
+          this.hostlist = ConfigHelper.getOutputInitialAddress(conf);
+          this.rpcPort = ConfigHelper.getOutputRpcPort(conf);
+          this.username = ConfigHelper.getOutputKeyspaceUserName(conf);
+          this.password = ConfigHelper.getOutputKeyspacePassword(conf);
+        }
+
+        public void init(String keyspace)
+        {
+            Set<InetAddress> hosts = new HashSet<InetAddress>();
+            String[] nodes = hostlist.split(",");
+            for (String node : nodes)
+            {
+                try
+                {
+                    hosts.add(InetAddress.getByName(node));
+                }
+                catch (UnknownHostException e)
+                {
+                    throw new RuntimeException(e);
+                }
+            }
+            Iterator<InetAddress> hostiter = hosts.iterator();
+            while (hostiter.hasNext())
+            {
+                try
+                {
+                    InetAddress host = hostiter.next();
+                    Cassandra.Client client = ConfigHelper.createConnection(conf, host.getHostAddress(), rpcPort);
+
+                    // log in
+                    client.set_keyspace(keyspace);
+                    if (username != null)
+                    {
+                        Map<String, String> creds = new HashMap<String, String>();
+                        creds.put(IAuthenticator.USERNAME_KEY, username);
+                        creds.put(IAuthenticator.PASSWORD_KEY, password);
+                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+                        client.login(authRequest);
+                    }
+
+                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
+                    List<KsDef> ksDefs = client.describe_keyspaces();
+
+                    setPartitioner(client.describe_partitioner());
+                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
+
+                    for (TokenRange tr : tokenRanges)
+                    {
+                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
+                        for (String ep : tr.endpoints)
+                        {
+                            addRangeForEndpoint(range, InetAddress.getByName(ep));
+                        }
+                    }
+
+                    for (KsDef ksDef : ksDefs)
+                    {
+                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
+                        for (CfDef cfDef : ksDef.cf_defs)
+                            cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
+                        knownCfs.put(ksDef.name, cfs);
+                    }
+                    break;
+                }
+                catch (Exception e)
+                {
+                    if (!hostiter.hasNext())
+                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
+                }
+            }
+        }
+
+        public CFMetaData getCFMetaData(String keyspace, String cfName)
+        {
+            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
+            return cfs != null ? cfs.get(cfName) : null;
+        } 
+    }
+
+    public static class NullOutputHandler implements OutputHandler
+    {
+        public void output(String msg) {}
+        public void debug(String msg) {}
+        public void warn(String msg) {}
+        public void warn(String msg, Throwable th) {}
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
index c3d8e05..f5a5a8d 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkOutputFormat.java
@@ -23,39 +23,10 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.cassandra.thrift.Mutation;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.*;
 
-public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
-    implements org.apache.hadoop.mapred.OutputFormat<ByteBuffer,List<Mutation>>
+public class BulkOutputFormat extends AbstractBulkOutputFormat<ByteBuffer,List<Mutation>>
 {
-    @Override
-    public void checkOutputSpecs(JobContext context)
-    {
-        checkOutputSpecs(HadoopCompat.getConfiguration(context));
-    }
-
-    private void checkOutputSpecs(Configuration conf)
-    {
-        if (ConfigHelper.getOutputKeyspace(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace with setColumnFamily()");
-        }
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        return new NullOutputCommitter();
-    }
-
-    /** Fills the deprecated OutputFormat interface for streaming. */
-    @Deprecated
-    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
-    {
-        checkOutputSpecs(job);
-    }
-
     /** Fills the deprecated OutputFormat interface for streaming. */
     @Deprecated
     public BulkRecordWriter getRecordWriter(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job, String name, org.apache.hadoop.util.Progressable progress) throws IOException
@@ -68,22 +39,4 @@ public class BulkOutputFormat extends OutputFormat<ByteBuffer,List<Mutation>>
     {
         return new BulkRecordWriter(context);
     }
-
-    public static class NullOutputCommitter extends OutputCommitter
-    {
-        public void abortTask(TaskAttemptContext taskContext) { }
-
-        public void cleanupJob(JobContext jobContext) { }
-
-        public void commitTask(TaskAttemptContext taskContext) { }
-
-        public boolean needsTaskCommit(TaskAttemptContext taskContext)
-        {
-            return false;
-        }
-
-        public void setupJob(JobContext jobContext) { }
-
-        public void setupTask(TaskAttemptContext taskContext) { }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
index 8bfc958..d67b856 100644
--- a/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
+++ b/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
@@ -19,57 +19,25 @@ package org.apache.cassandra.hadoop;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.List;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.cassandra.auth.IAuthenticator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.io.sstable.SSTableLoader;
 import org.apache.cassandra.io.sstable.SSTableSimpleUnsortedWriter;
-import org.apache.cassandra.streaming.StreamState;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.OutputHandler;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.CounterColumn;
+import org.apache.cassandra.thrift.Mutation;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.thrift.protocol.*;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
 import org.apache.hadoop.util.Progressable;
 
-final class BulkRecordWriter extends RecordWriter<ByteBuffer,List<Mutation>>
-implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
+public final class BulkRecordWriter extends AbstractBulkRecordWriter<ByteBuffer, List<Mutation>>
 {
-    private final static String OUTPUT_LOCATION = "mapreduce.output.bulkoutputformat.localdir";
-    private final static String BUFFER_SIZE_IN_MB = "mapreduce.output.bulkoutputformat.buffersize";
-    private final static String STREAM_THROTTLE_MBITS = "mapreduce.output.bulkoutputformat.streamthrottlembits";
-    private final static String MAX_FAILED_HOSTS = "mapreduce.output.bulkoutputformat.maxfailedhosts";
-    private final Configuration conf;
-    private final Logger logger = LoggerFactory.getLogger(BulkRecordWriter.class);
-    private SSTableSimpleUnsortedWriter writer;
-    private SSTableLoader loader;
-    private File outputdir;
-    private Progressable progress;
-    private TaskAttemptContext context;
-    private int maxFailures;
-
+    private File outputDir;
+    
+    
     private enum CFType
     {
         NORMAL,
@@ -87,31 +55,17 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     BulkRecordWriter(TaskAttemptContext context)
     {
-        this(HadoopCompat.getConfiguration(context));
-        this.context = context;
+        super(context);
     }
 
     BulkRecordWriter(Configuration conf, Progressable progress)
     {
-        this(conf);
-        this.progress = progress;
+        super(conf, progress);
     }
 
     BulkRecordWriter(Configuration conf)
     {
-        Config.setClientMode(true);
-        Config.setOutboundBindAny(true);
-        this.conf = conf;
-        DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(Integer.parseInt(conf.get(STREAM_THROTTLE_MBITS, "0")));
-        maxFailures = Integer.parseInt(conf.get(MAX_FAILED_HOSTS, "0"));
-    }
-
-    private String getOutputLocation() throws IOException
-    {
-        String dir = conf.get(OUTPUT_LOCATION, System.getProperty("java.io.tmpdir"));
-        if (dir == null)
-            throw new IOException("Output directory not defined, if hadoop is not setting java.io.tmpdir then define " + OUTPUT_LOCATION);
-        return dir;
+        super(conf);
     }
 
     private void setTypes(Mutation mutation)
@@ -131,26 +85,23 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
 
     private void prepareWriter() throws IOException
     {
-        if (outputdir == null)
+        if (outputDir == null)
         {
             String keyspace = ConfigHelper.getOutputKeyspace(conf);
             //dir must be named by ks/cf for the loader
-            outputdir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
-            outputdir.mkdirs();
+            outputDir = new File(getOutputLocation() + File.separator + keyspace + File.separator + ConfigHelper.getOutputColumnFamily(conf));
+            outputDir.mkdirs();
         }
         
         if (writer == null)
         {
             AbstractType<?> subcomparator = null;
-            ExternalClient externalClient = null;
-            String username = ConfigHelper.getOutputKeyspaceUserName(conf);
-            String password = ConfigHelper.getOutputKeyspacePassword(conf);
 
             if (cfType == CFType.SUPER)
                 subcomparator = BytesType.instance;
 
-            this.writer = new SSTableSimpleUnsortedWriter(
-                    outputdir,
+            writer = new SSTableSimpleUnsortedWriter(
+                    outputDir,
                     ConfigHelper.getOutputPartitioner(conf),
                     ConfigHelper.getOutputKeyspace(conf),
                     ConfigHelper.getOutputColumnFamily(conf),
@@ -159,12 +110,7 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                     Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")),
                     ConfigHelper.getOutputCompressionParamaters(conf));
 
-            externalClient = new ExternalClient(ConfigHelper.getOutputInitialAddress(conf),
-                                                ConfigHelper.getOutputRpcPort(conf),
-                                                username,
-                                                password);
-
-            this.loader = new SSTableLoader(outputdir, externalClient, new NullOutputHandler());
+            this.loader = new SSTableLoader(outputDir, new ExternalClient(conf), new NullOutputHandler());
         }
     }
 
@@ -173,36 +119,37 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
     {
         setTypes(value.get(0));
         prepareWriter();
-        writer.newRow(keybuff);
+        SSTableSimpleUnsortedWriter ssWriter = (SSTableSimpleUnsortedWriter) writer;
+        ssWriter.newRow(keybuff);
         for (Mutation mut : value)
         {
             if (cfType == CFType.SUPER)
             {
-                writer.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
+                ssWriter.newSuperColumn(mut.getColumn_or_supercolumn().getSuper_column().name);
                 if (colType == ColType.COUNTER)
                     for (CounterColumn column : mut.getColumn_or_supercolumn().getCounter_super_column().columns)
-                        writer.addCounterColumn(column.name, column.value);
+                        ssWriter.addCounterColumn(column.name, column.value);
                 else
                 {
                     for (Column column : mut.getColumn_or_supercolumn().getSuper_column().columns)
                     {
                         if(column.ttl == 0)
-                            writer.addColumn(column.name, column.value, column.timestamp);
+                            ssWriter.addColumn(column.name, column.value, column.timestamp);
                         else
-                            writer.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
+                            ssWriter.addExpiringColumn(column.name, column.value, column.timestamp, column.ttl, System.currentTimeMillis() + ((long)column.ttl * 1000));
                     }
                 }
             }
             else
             {
                 if (colType == ColType.COUNTER)
-                    writer.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
+                    ssWriter.addCounterColumn(mut.getColumn_or_supercolumn().counter_column.name, mut.getColumn_or_supercolumn().counter_column.value);
                 else
                 {
                     if(mut.getColumn_or_supercolumn().column.ttl == 0)
-                        writer.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
+                        ssWriter.addColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp);
                     else
-                        writer.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
+                        ssWriter.addExpiringColumn(mut.getColumn_or_supercolumn().column.name, mut.getColumn_or_supercolumn().column.value, mut.getColumn_or_supercolumn().column.timestamp, mut.getColumn_or_supercolumn().column.ttl, System.currentTimeMillis() + ((long)(mut.getColumn_or_supercolumn().column.ttl) * 1000));
                 }
             }
             if (null != progress)
@@ -211,158 +158,4 @@ implements org.apache.hadoop.mapred.RecordWriter<ByteBuffer,List<Mutation>>
                 HadoopCompat.progress(context);
         }
     }
-    @Override
-    public void close(TaskAttemptContext context) throws IOException, InterruptedException
-    {
-        close();
-    }
-
-    /** Fills the deprecated RecordWriter interface for streaming. */
-    @Deprecated
-    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
-    {
-        close();
-    }
-
-    private void close() throws IOException
-    {
-        if (writer != null)
-        {
-            writer.close();
-            Future<StreamState> future = loader.stream();
-            while (true)
-            {
-                try
-                {
-                    future.get(1000, TimeUnit.MILLISECONDS);
-                    break;
-                }
-                catch (ExecutionException | TimeoutException te)
-                {
-                    if (null != progress)
-                        progress.progress();
-                    if (null != context)
-                        HadoopCompat.progress(context);
-                }
-                catch (InterruptedException e)
-                {
-                    throw new IOException(e);
-                }
-            }
-            if (loader.getFailedHosts().size() > 0)
-            {
-                if (loader.getFailedHosts().size() > maxFailures)
-                    throw new IOException("Too many hosts failed: " + loader.getFailedHosts());
-                else
-                    logger.warn("Some hosts failed: " + loader.getFailedHosts());
-            }
-        }
-    }
-
-    static class ExternalClient extends SSTableLoader.Client
-    {
-        private final Map<String, Map<String, CFMetaData>> knownCfs = new HashMap<>();
-        private final String hostlist;
-        private final int rpcPort;
-        private final String username;
-        private final String password;
-
-        public ExternalClient(String hostlist, int port, String username, String password)
-        {
-            super();
-            this.hostlist = hostlist;
-            this.rpcPort = port;
-            this.username = username;
-            this.password = password;
-        }
-
-        public void init(String keyspace)
-        {
-            Set<InetAddress> hosts = new HashSet<InetAddress>();
-            String[] nodes = hostlist.split(",");
-            for (String node : nodes)
-            {
-                try
-                {
-                    hosts.add(InetAddress.getByName(node));
-                }
-                catch (UnknownHostException e)
-                {
-                    throw new RuntimeException(e);
-                }
-            }
-            Iterator<InetAddress> hostiter = hosts.iterator();
-            while (hostiter.hasNext())
-            {
-                try
-                {
-                    InetAddress host = hostiter.next();
-                    Cassandra.Client client = createThriftClient(host.getHostAddress(), rpcPort);
-
-                    // log in
-                    client.set_keyspace(keyspace);
-                    if (username != null)
-                    {
-                        Map<String, String> creds = new HashMap<String, String>();
-                        creds.put(IAuthenticator.USERNAME_KEY, username);
-                        creds.put(IAuthenticator.PASSWORD_KEY, password);
-                        AuthenticationRequest authRequest = new AuthenticationRequest(creds);
-                        client.login(authRequest);
-                    }
-
-                    List<TokenRange> tokenRanges = client.describe_ring(keyspace);
-                    List<KsDef> ksDefs = client.describe_keyspaces();
-
-                    setPartitioner(client.describe_partitioner());
-                    Token.TokenFactory tkFactory = getPartitioner().getTokenFactory();
-
-                    for (TokenRange tr : tokenRanges)
-                    {
-                        Range<Token> range = new Range<Token>(tkFactory.fromString(tr.start_token), tkFactory.fromString(tr.end_token));
-                        for (String ep : tr.endpoints)
-                        {
-                            addRangeForEndpoint(range, InetAddress.getByName(ep));
-                        }
-                    }
-
-                    for (KsDef ksDef : ksDefs)
-                    {
-                        Map<String, CFMetaData> cfs = new HashMap<>(ksDef.cf_defs.size());
-                        for (CfDef cfDef : ksDef.cf_defs)
-                            cfs.put(cfDef.name, CFMetaData.fromThrift(cfDef));
-                        knownCfs.put(ksDef.name, cfs);
-                    }
-                    break;
-                }
-                catch (Exception e)
-                {
-                    if (!hostiter.hasNext())
-                        throw new RuntimeException("Could not retrieve endpoint ranges: ", e);
-                }
-            }
-        }
-
-        public CFMetaData getCFMetaData(String keyspace, String cfName)
-        {
-            Map<String, CFMetaData> cfs = knownCfs.get(keyspace);
-            return cfs != null ? cfs.get(cfName) : null;
-        }
-
-        private static Cassandra.Client createThriftClient(String host, int port) throws TTransportException
-        {
-            TSocket socket = new TSocket(host, port);
-            TTransport trans = new TFramedTransport(socket);
-            trans.open();
-            TProtocol protocol = new org.apache.thrift.protocol.TBinaryProtocol(trans);
-            return new Cassandra.Client(protocol);
-        }
-    }
-
-    static class NullOutputHandler implements OutputHandler
-    {
-        public void output(String msg) {}
-        public void debug(String msg) {}
-        public void warn(String msg) {}
-        public void warn(String msg, Throwable th) {}
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
new file mode 100644
index 0000000..58e05b6
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkOutputFormat.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.cassandra.hadoop.AbstractBulkOutputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The <code>CqlBulkOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * bound variable values) as CQL rows (and respective columns) in a given
+ * ColumnFamily.
+ *
+ * <p>
+ * As is the case with the {@link org.apache.cassandra.hadoop.CqlOutputFormat}, 
+ * you need to set the prepared statement in your
+ * Hadoop job Configuration. The {@link CqlConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputPreparedStatement} method, is provided to make this
+ * simple.
+ * you need to set the Keyspace. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
+ * simple.
+ * </p>
+ */
+public class CqlBulkOutputFormat extends AbstractBulkOutputFormat<Object, List<ByteBuffer>>
+{   
+  
+    private static final String OUTPUT_CQL_SCHEMA_PREFIX = "cassandra.columnfamily.schema.";
+    private static final String OUTPUT_CQL_INSERT_PREFIX = "cassandra.columnfamily.insert.";
+  
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public CqlBulkRecordWriter getRecordWriter(FileSystem filesystem, JobConf job, String name, Progressable progress) throws IOException
+    {
+        return new CqlBulkRecordWriter(job, progress);
+    }
+
+    /**
+     * Get the {@link RecordWriter} for the given task.
+     *
+     * @param context
+     *            the information about the current task.
+     * @return a {@link RecordWriter} to write the output for the job.
+     * @throws IOException
+     */
+    public CqlBulkRecordWriter getRecordWriter(final TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new CqlBulkRecordWriter(context);
+    }
+    
+    public static void setColumnFamilySchema(Configuration conf, String columnFamily, String schema)
+    {
+        conf.set(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily, schema);
+    }
+
+    public static void setColumnFamilyInsertStatement(Configuration conf, String columnFamily, String insertStatement)
+    {
+        conf.set(OUTPUT_CQL_INSERT_PREFIX + columnFamily, insertStatement);
+    }
+    
+    public static String getColumnFamilySchema(Configuration conf, String columnFamily)
+    {
+        String schema = conf.get(OUTPUT_CQL_SCHEMA_PREFIX + columnFamily);
+        if (schema == null)
+        { 
+            throw new UnsupportedOperationException("You must set the ColumnFamily schema using setColumnFamilySchema.");
+        }
+        return schema; 
+    }
+    
+    public static String getColumnFamilyInsertStatement(Configuration conf, String columnFamily)
+    {
+        String insert = conf.get(OUTPUT_CQL_INSERT_PREFIX + columnFamily); 
+        if (insert == null)
+        {
+            throw new UnsupportedOperationException("You must set the ColumnFamily insert statement using setColumnFamilySchema.");
+        }
+        return insert;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
new file mode 100644
index 0000000..7a75bb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlBulkRecordWriter.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop.cql3;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.hadoop.AbstractBulkRecordWriter;
+import org.apache.cassandra.hadoop.BulkRecordWriter;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.HadoopCompat;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.SSTableLoader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * The <code>CqlBulkRecordWriter</code> maps the output &lt;key, value&gt;
+ * pairs to a Cassandra column family. In particular, it applies the binded variables
+ * in the value to the prepared statement, which it associates with the key, and in 
+ * turn the responsible endpoint.
+ *
+ * <p>
+ * Furthermore, this writer groups the cql queries by the endpoint responsible for
+ * the rows being affected. This allows the cql queries to be executed in parallel,
+ * directly to a responsible endpoint.
+ * </p>
+ *
+ * @see CqlBulkOutputFormat
+ */
+public class CqlBulkRecordWriter extends AbstractBulkRecordWriter<Object, List<ByteBuffer>>
+{
+    private String keyspace;
+    private String columnFamily;
+    private String schema;
+    private String insertStatement;
+    private File outputDir;
+
+    CqlBulkRecordWriter(TaskAttemptContext context) throws IOException
+    {
+        super(context);
+        setConfigs();
+    }
+
+    CqlBulkRecordWriter(Configuration conf, Progressable progress) throws IOException
+    {
+        super(conf, progress);
+        setConfigs();
+    }
+
+    CqlBulkRecordWriter(Configuration conf) throws IOException
+    {
+        super(conf);
+        setConfigs();
+    }
+    
+    private void setConfigs() throws IOException
+    {
+        // if anything is missing, exceptions will be thrown here, instead of on write()
+        keyspace = ConfigHelper.getOutputKeyspace(conf);
+        columnFamily = ConfigHelper.getOutputColumnFamily(conf);
+        schema = CqlBulkOutputFormat.getColumnFamilySchema(conf, columnFamily);
+        insertStatement = CqlBulkOutputFormat.getColumnFamilyInsertStatement(conf, columnFamily);
+        outputDir = getColumnFamilyDirectory();
+    }
+
+    
+    private void prepareWriter() throws IOException
+    {
+        try
+        {
+            if (writer == null)
+            {
+                writer = CQLSSTableWriter.builder()
+                    .forTable(schema)
+                    .using(insertStatement)
+                    .withPartitioner(ConfigHelper.getOutputPartitioner(conf))
+                    .inDirectory(outputDir)
+                    .withBufferSizeInMB(Integer.parseInt(conf.get(BUFFER_SIZE_IN_MB, "64")))
+                    .build();
+            }
+            if (loader == null)
+            {
+                ExternalClient externalClient = new ExternalClient(conf);
+                
+                externalClient.addKnownCfs(keyspace, schema);
+
+                this.loader = new SSTableLoader(outputDir, externalClient, new BulkRecordWriter.NullOutputHandler());
+            }
+        }
+        catch (Exception e)
+        {
+            throw new IOException(e);
+        }      
+    }
+    
+    /**
+     * The column values must correspond to the order in which
+     * they appear in the insert stored procedure. 
+     * 
+     * Key is not used, so it can be null or any object.
+     * </p>
+     *
+     * @param key
+     *            any object or null.
+     * @param values
+     *            the values to write.
+     * @throws IOException
+     */
+    @Override
+    public void write(Object key, List<ByteBuffer> values) throws IOException
+    {
+        prepareWriter();
+        try
+        {
+            ((CQLSSTableWriter) writer).rawAddRow(values);
+            
+            if (null != progress)
+                progress.progress();
+            if (null != context)
+                HadoopCompat.progress(context);
+        } 
+        catch (InvalidRequestException e)
+        {
+            throw new IOException("Error adding row with key: " + key, e);
+        }
+    }
+    
+    private File getColumnFamilyDirectory() throws IOException
+    {
+        File dir = new File(String.format("%s%s%s%s%s", getOutputLocation(), File.separator, keyspace, File.separator, columnFamily));
+        
+        if (!dir.exists() && !dir.mkdirs())
+        {
+            throw new IOException("Failed to created output directory: " + dir);
+        }
+        
+        return dir;
+    }
+    
+    public static class ExternalClient extends AbstractBulkRecordWriter.ExternalClient
+    {
+        private Map<String, Map<String, CFMetaData>> knownCqlCfs = new HashMap<>();
+        
+        public ExternalClient(Configuration conf)
+        {
+            super(conf);
+        }
+
+        public void addKnownCfs(String keyspace, String cql)
+        {
+            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
+            
+            if (cfs == null)
+            {
+                cfs = new HashMap<>();
+                knownCqlCfs.put(keyspace, cfs);
+            }
+            
+            CFMetaData metadata = CFMetaData.compile(cql, keyspace);
+            cfs.put(metadata.cfName, metadata);
+        }
+        
+        @Override
+        public CFMetaData getCFMetaData(String keyspace, String cfName)
+        {
+            CFMetaData metadata = super.getCFMetaData(keyspace, cfName);
+            if (metadata != null)
+            {
+                return metadata;
+            }
+            
+            Map<String, CFMetaData> cfs = knownCqlCfs.get(keyspace);
+            return cfs != null ? cfs.get(cfName) : null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index b2c8fbf..e894996 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -89,7 +89,7 @@ public class CqlConfigHelper
     private static final String INPUT_NATIVE_SSL_CIPHER_SUITES = "cassandra.input.native.ssl.cipher.suites";
 
     private static final String OUTPUT_CQL = "cassandra.output.cql";
-
+    
     /**
      * Set the CQL columns for the input of this job.
      *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
index 7c89bef..5845175 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlOutputFormat.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.mapreduce.*;
 /**
  * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
  * OutputFormat that allows reduce tasks to store keys (and corresponding
- *  binded variable values) as CQL rows (and respective columns) in a given
+ * bound variable values) as CQL rows (and respective columns) in a given
  * ColumnFamily.
  *
  * <p>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
index e337185..db87226 100644
--- a/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/AbstractSSTableSimpleWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -34,7 +35,7 @@ import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.HeapAllocator;
 import org.apache.cassandra.utils.Pair;
 
-public abstract class AbstractSSTableSimpleWriter
+public abstract class AbstractSSTableSimpleWriter implements Closeable
 {
     protected final File directory;
     protected final CFMetaData metadata;
@@ -161,13 +162,6 @@ public abstract class AbstractSSTableSimpleWriter
     }
 
     /**
-     * Close this writer.
-     * This method should be called, otherwise the produced sstables are not
-     * guaranteed to be complete (and won't be in practice).
-     */
-    public abstract void close() throws IOException;
-
-    /**
      * Package protected for use by AbstractCQLSSTableWriter.
      * Not meant to be exposed publicly.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/44764c03/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
index a7ece70..61990ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/CQLSSTableWriter.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.io.sstable;
 
+import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -71,7 +72,7 @@ import org.apache.cassandra.utils.Pair;
  *   writer.close();
  * </pre>
  */
-public class CQLSSTableWriter
+public class CQLSSTableWriter implements Closeable
 {
     private final AbstractSSTableSimpleWriter writer;
     private final UpdateStatement insert;