You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/06 21:19:22 UTC

[10/10] git commit: merge from 1.2

merge from 1.2


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/90052d5a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/90052d5a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/90052d5a

Branch: refs/heads/trunk
Commit: 90052d5a460efefd2dca7a0a5c5fbd54c9674d75
Parents: 6a9e6d5
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 6 14:16:28 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Jun 6 14:18:58 2013 -0500

----------------------------------------------------------------------
 examples/hadoop_cql3_word_count/README.txt         |   47 +
 examples/hadoop_cql3_word_count/bin/word_count     |   61 ++
 .../hadoop_cql3_word_count/bin/word_count_counters |   58 ++
 .../hadoop_cql3_word_count/bin/word_count_setup    |   61 ++
 examples/hadoop_cql3_word_count/build.xml          |  113 +++
 .../hadoop_cql3_word_count/conf/log4j.properties   |   32 +
 examples/hadoop_cql3_word_count/ivy.xml            |   24 +
 examples/hadoop_cql3_word_count/src/WordCount.java |  236 +++++
 .../src/WordCountCounters.java                     |  122 +++
 .../hadoop_cql3_word_count/src/WordCountSetup.java |  213 ++++
 .../hadoop/AbstractColumnFamilyInputFormat.java    |  346 +++++++
 .../hadoop/AbstractColumnFamilyOutputFormat.java   |  159 +++
 .../hadoop/AbstractColumnFamilyRecordWriter.java   |  193 ++++
 .../cassandra/hadoop/ColumnFamilyInputFormat.java  |  307 +------
 .../cassandra/hadoop/ColumnFamilyOutputFormat.java |  122 +---
 .../cassandra/hadoop/ColumnFamilyRecordReader.java |   60 +-
 .../cassandra/hadoop/ColumnFamilyRecordWriter.java |  190 +---
 .../apache/cassandra/hadoop/ColumnFamilySplit.java |    4 +-
 .../org/apache/cassandra/hadoop/ConfigHelper.java  |   59 +-
 .../org/apache/cassandra/hadoop/Progressable.java  |    4 +-
 .../cassandra/hadoop/cql3/CQLConfigHelper.java     |  109 ++
 .../hadoop/cql3/ColumnFamilyInputFormat.java       |   83 ++
 .../hadoop/cql3/ColumnFamilyOutputFormat.java      |   78 ++
 .../hadoop/cql3/ColumnFamilyRecordReader.java      |  763 +++++++++++++++
 .../hadoop/cql3/ColumnFamilyRecordWriter.java      |  386 ++++++++
 .../cassandra/thrift/TClientTransportFactory.java  |   70 ++
 .../cassandra/thrift/TFramedTransportFactory.java  |   24 +-
 27 files changed, 3272 insertions(+), 652 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/README.txt
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/README.txt b/examples/hadoop_cql3_word_count/README.txt
new file mode 100644
index 0000000..f984b02
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/README.txt
@@ -0,0 +1,47 @@
+Introduction
+============
+
+WordCount hadoop example: Inserts a bunch of words across multiple rows,
+and counts them, with RandomPartitioner. The word_count_counters example sums
+the value of counter columns for a key.
+
+The scripts in bin/ assume you are running with cwd of contrib/word_count.
+
+
+Running
+=======
+
+First build and start a Cassandra server with the default configuration*, 
+then run
+
+contrib/word_count$ ant
+contrib/word_count$ bin/word_count_setup
+contrib/word_count$ bin/word_count
+contrib/word_count$ bin/word_count_counters
+
+In order to view the results in Cassandra, one can use bin/cassandra-cli and
+perform the following operations:
+$ bin/cqlsh localhost
+> use cql3_worldcount;
+> select * from output_words;
+
+The output of the word count can now be configured. In the bin/word_count
+file, you can specify the OUTPUT_REDUCER. The two options are 'filesystem'
+and 'cassandra'. The filesystem option outputs to the /tmp/word_count*
+directories. The cassandra option outputs to the 'output_words' column family
+in the 'cql3_worldcount' keyspace.  'cassandra' is the default.
+
+Read the code in src/ for more details.
+
+The word_count_counters example sums the counter columns for a row. The output
+is written to a text file in /tmp/word_count_counters.
+
+*If you want to point wordcount at a real cluster, modify the seed
+and listenaddress settings accordingly.
+
+
+Troubleshooting
+===============
+
+word_count uses conf/log4j.properties to log to wc.out.
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/bin/word_count
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count b/examples/hadoop_cql3_word_count/bin/word_count
new file mode 100644
index 0000000..a0c5aa0
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/bin/word_count
@@ -0,0 +1,61 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+# Cassandra class files.
+if [ ! -d $cwd/../../../build/classes/main ]; then
+    echo "Unable to locate cassandra class files" >&2
+    exit 1
+fi
+
+# word_count Jar.
+if [ ! -e $cwd/../build/word_count.jar ]; then
+    echo "Unable to locate word_count jar" >&2
+    exit 1
+fi
+
+CLASSPATH=$CLASSPATH:$cwd/../conf
+CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
+for jar in $cwd/../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+OUTPUT_REDUCER=cassandra
+
+#echo $CLASSPATH
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCount output_reducer=$OUTPUT_REDUCER

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/bin/word_count_counters
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_counters b/examples/hadoop_cql3_word_count/bin/word_count_counters
new file mode 100644
index 0000000..7793477
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/bin/word_count_counters
@@ -0,0 +1,58 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+# Cassandra class files.
+if [ ! -d $cwd/../../../build/classes/main ]; then
+    echo "Unable to locate cassandra class files" >&2
+    exit 1
+fi
+
+# word_count Jar.
+if [ ! -e $cwd/../build/word_count.jar ]; then
+    echo "Unable to locate word_count jar" >&2
+    exit 1
+fi
+
+CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/main
+CLASSPATH=$CLASSPATH:$cwd/../../../build/classes/thrift
+for jar in $cwd/../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+#echo $CLASSPATH
+$JAVA -Xmx1G -ea -cp $CLASSPATH WordCountCounters

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/bin/word_count_setup
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/bin/word_count_setup b/examples/hadoop_cql3_word_count/bin/word_count_setup
new file mode 100644
index 0000000..d194a45
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/bin/word_count_setup
@@ -0,0 +1,61 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cwd=`dirname $0`
+
+# Cassandra class files.
+if [ ! -d $cwd/../../../build/classes/main ]; then
+    echo "Unable to locate cassandra class files" >&2
+    exit 1
+fi
+
+# word_count Jar.
+if [ ! -e $cwd/../build/word_count.jar ]; then
+    echo "Unable to locate word_count jar" >&2
+    exit 1
+fi
+
+CLASSPATH=$CLASSPATH:$cwd/../build/word_count.jar
+CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/main
+CLASSPATH=$CLASSPATH:.:$cwd/../../../build/classes/thrift
+for jar in $cwd/../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../lib/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+for jar in $cwd/../../../build/lib/jars/*.jar; do
+    CLASSPATH=$CLASSPATH:$jar
+done
+
+if [ -x $JAVA_HOME/bin/java ]; then
+    JAVA=$JAVA_HOME/bin/java
+else
+    JAVA=`which java`
+fi
+
+if [ "x$JAVA" = "x" ]; then
+    echo "Java executable not found (hint: set JAVA_HOME)" >&2
+    exit 1
+fi
+
+HOST=localhost
+PORT=9160
+FRAMED=true
+
+$JAVA -Xmx1G -ea -Dcassandra.host=$HOST -Dcassandra.port=$PORT -Dcassandra.framed=$FRAMED -cp $CLASSPATH WordCountSetup

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/build.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/build.xml b/examples/hadoop_cql3_word_count/build.xml
new file mode 100644
index 0000000..939e1b3
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/build.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements.  See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership.  The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License.  You may obtain a copy of the License at
+ ~
+ ~    http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied.  See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project default="jar" name="word_count" xmlns:ivy="antlib:org.apache.ivy.ant">
+    <property name="cassandra.dir" value="../.." />
+    <property name="cassandra.dir.lib" value="${cassandra.dir}/lib" />
+    <property name="cassandra.classes" value="${cassandra.dir}/build/classes" />
+    <property name="build.src" value="${basedir}/src" />
+    <property name="build.dir" value="${basedir}/build" />
+    <property name="ivy.lib.dir" value="${build.dir}/lib" />
+    <property name="build.classes" value="${build.dir}/classes" />
+    <property name="final.name" value="word_count" />
+    <property name="ivy.version" value="2.1.0" />
+    <property name="ivy.url"
+              value="http://repo2.maven.org/maven2/org/apache/ivy/ivy" />
+
+    <condition property="ivy.jar.exists">
+        <available file="${build.dir}/ivy-${ivy.version}.jar" />
+    </condition>
+
+    <path id="autoivy.classpath">
+        <fileset dir="${ivy.lib.dir}">
+            <include name="**/*.jar" />
+        </fileset>
+        <pathelement location="${build.dir}/ivy-${ivy.version}.jar"/>
+    </path>
+
+    <path id="wordcount.build.classpath">
+        <fileset dir="${ivy.lib.dir}">
+            <include name="**/*.jar" />
+        </fileset>
+        <!-- cassandra dependencies -->
+        <fileset dir="${cassandra.dir.lib}">
+            <include name="**/*.jar" />
+        </fileset>
+        <fileset dir="${cassandra.dir}/build/lib/jars">
+            <include name="**/*.jar" />
+        </fileset>
+        <pathelement location="${cassandra.classes}/main" />
+        <pathelement location="${cassandra.classes}/thrift" />
+    </path>
+
+    <target name="init">
+        <mkdir dir="${build.classes}" />
+    </target>
+
+    <target depends="init,ivy-retrieve-build" name="build">
+        <javac destdir="${build.classes}">
+            <src path="${build.src}" />
+            <classpath refid="wordcount.build.classpath" />
+        </javac>
+    </target>
+
+    <target name="jar" depends="build">
+        <mkdir dir="${build.classes}/META-INF" />
+        <jar jarfile="${build.dir}/${final.name}.jar">
+           <fileset dir="${build.classes}" />
+           <fileset dir="${cassandra.classes}/main" />
+           <fileset dir="${cassandra.classes}/thrift" />
+           <fileset dir="${cassandra.dir}">
+               <include name="lib/**/*.jar" />
+           </fileset>
+           <zipfileset dir="${cassandra.dir}/build/lib/jars/" prefix="lib">
+               <include name="**/*.jar" />
+           </zipfileset>
+           <fileset file="${basedir}/cassandra.yaml" />
+        </jar>
+    </target>
+
+    <target name="clean">
+        <delete dir="${build.dir}" />
+    </target>
+
+    <!--
+        Ivy Specific targets
+            to fetch Ivy and this project's dependencies
+    -->
+	<target name="ivy-download" unless="ivy.jar.exists">
+      <echo>Downloading Ivy...</echo>
+      <mkdir dir="${build.dir}" />
+      <get src="${ivy.url}/${ivy.version}/ivy-${ivy.version}.jar"
+           dest="${build.dir}/ivy-${ivy.version}.jar" usetimestamp="true" />
+    </target>
+
+    <target name="ivy-init" depends="ivy-download" unless="ivy.initialized">
+      <mkdir dir="${ivy.lib.dir}"/>
+      <taskdef resource="org/apache/ivy/ant/antlib.xml"
+               uri="antlib:org.apache.ivy.ant"
+               classpathref="autoivy.classpath"/>
+      <property name="ivy.initialized" value="true"/>
+    </target>
+
+    <target name="ivy-retrieve-build" depends="ivy-init">
+      <ivy:retrieve type="jar,source" sync="true"
+             pattern="${ivy.lib.dir}/[type]s/[artifact]-[revision].[ext]" />
+    </target>
+</project>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/conf/log4j.properties b/examples/hadoop_cql3_word_count/conf/log4j.properties
new file mode 100644
index 0000000..70f7657
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/conf/log4j.properties
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=DEBUG,stdout,F
+
+#stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%5p %d{HH:mm:ss,SSS} %m%n
+
+# log file
+log4j.appender.F=org.apache.log4j.FileAppender
+log4j.appender.F.Append=false
+log4j.appender.F.layout=org.apache.log4j.PatternLayout
+log4j.appender.F.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
+# Edit the next line to point to your logs directory
+log4j.appender.F.File=wc.out
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/ivy.xml
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/ivy.xml b/examples/hadoop_cql3_word_count/ivy.xml
new file mode 100644
index 0000000..9d44895
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/ivy.xml
@@ -0,0 +1,24 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements.  See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership.  The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License.  You may obtain a copy of the License at
+ ~
+ ~    http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied.  See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<ivy-module version="2.0">
+    <info organisation="apache-cassandra" module="word-count"/>
+    <dependencies>
+        <dependency org="org.apache.hadoop" name="hadoop-core" rev="0.20.2"/>
+    </dependencies>
+</ivy-module>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
new file mode 100644
index 0000000..09dd9e4
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.Map.Entry;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.hadoop.cql3.ColumnFamilyOutputFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.hadoop.cql3.CQLConfigHelper;
+import org.apache.cassandra.hadoop.cql3.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import java.nio.charset.CharacterCodingException;
+
+/**
+ * This counts the occurrences of words in ColumnFamily
+ *   cql3_worldcount ( user_id text,
+ *                   category_id text,
+ *                   sub_category_id text,
+ *                   title  text,
+ *                   body  text,
+ *                   PRIMARY KEY (user_id, category_id, sub_category_id))
+ *
+ * For each word, we output the total number of occurrences across all body texts.
+ *
+ * When outputting to Cassandra, we write the word counts to column family
+ *  output_words ( row_id1 text,
+ *                 row_id2 text,
+ *                 word text,
+ *                 count_num text,
+ *                 PRIMARY KEY ((row_id1, row_id2), word))
+ * as a {word, count} to columns: word, count_num with a row key of "word sum"
+ */
+public class WordCount extends Configured implements Tool
+{
+    private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
+
+    static final String KEYSPACE = "cql3_worldcount";
+    static final String COLUMN_FAMILY = "inputs";
+
+    static final String OUTPUT_REDUCER_VAR = "output_reducer";
+    static final String OUTPUT_COLUMN_FAMILY = "output_words";
+
+    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
+
+    private static final String PRIMARY_KEY = "row_key";
+
+    public static void main(String[] args) throws Exception
+    {
+        // Let ToolRunner handle generic command-line options
+        ToolRunner.run(new Configuration(), new WordCount(), args);
+        System.exit(0);
+    }
+
+    public static class TokenizerMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, IntWritable>
+    {
+        private final static IntWritable one = new IntWritable(1);
+        private Text word = new Text();
+        private ByteBuffer sourceColumn;
+
+        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
+        throws IOException, InterruptedException
+        {
+        }
+
+        public void map(Map<String, ByteBuffer> keys, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
+        {
+            for (Entry<String, ByteBuffer> column : columns.entrySet())
+            {
+                if (!"body".equalsIgnoreCase(column.getKey()))
+                    continue;
+
+                String value = ByteBufferUtil.string(column.getValue());
+
+                logger.debug("read {}:{}={} from {}",
+                             new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});
+
+                StringTokenizer itr = new StringTokenizer(value);
+                while (itr.hasMoreTokens())
+                {
+                    word.set(itr.nextToken());
+                    context.write(word, one);
+                }
+            }
+        }
+
+        private String toString(Map<String, ByteBuffer> keys)
+        {
+            String result = "";
+            try
+            {
+                for (ByteBuffer key : keys.values())
+                    result = result + ByteBufferUtil.string(key) + ":";
+            }
+            catch (CharacterCodingException e)
+            {
+                logger.error("Failed to print keys", e);
+            }
+            return result;
+        }
+    }
+
+    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
+    {
+        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
+        {
+            int sum = 0;
+            for (IntWritable val : values)
+                sum += val.get();
+            context.write(key, new IntWritable(sum));
+        }
+    }
+
+    public static class ReducerToCassandra extends Reducer<Text, IntWritable, Map<String, ByteBuffer>, List<ByteBuffer>>
+    {
+        private Map<String, ByteBuffer> keys;
+        private ByteBuffer key;
+        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
+        throws IOException, InterruptedException
+        {
+            keys = new LinkedHashMap<String, ByteBuffer>();
+            String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
+            keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
+            keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
+        }
+
+        public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
+        {
+            int sum = 0;
+            for (IntWritable val : values)
+                sum += val.get();
+            context.write(keys, getBindVariables(word, sum));
+        }
+
+        private List<ByteBuffer> getBindVariables(Text word, int sum)
+        {
+            List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
+            variables.add(keys.get("row_id1"));
+            variables.add(keys.get("row_id2"));
+            variables.add(ByteBufferUtil.bytes(word.toString()));
+            variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
+            return variables;
+        }
+    }
+
+    public int run(String[] args) throws Exception
+    {
+        String outputReducerType = "filesystem";
+        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
+        {
+            String[] s = args[0].split("=");
+            if (s != null && s.length == 2)
+                outputReducerType = s[1];
+        }
+        logger.info("output reducer type: " + outputReducerType);
+
+        Job job = new Job(getConf(), "wordcount");
+        job.setJarByClass(WordCount.class);
+        job.setMapperClass(TokenizerMapper.class);
+
+        if (outputReducerType.equalsIgnoreCase("filesystem"))
+        {
+            job.setCombinerClass(ReducerToFilesystem.class);
+            job.setReducerClass(ReducerToFilesystem.class);
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(IntWritable.class);
+            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
+        }
+        else
+        {
+            job.setReducerClass(ReducerToCassandra.class);
+
+            job.setMapOutputKeyClass(Text.class);
+            job.setMapOutputValueClass(IntWritable.class);
+            job.setOutputKeyClass(Map.class);
+            job.setOutputValueClass(List.class);
+
+            job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
+
+            ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
+            job.getConfiguration().set(PRIMARY_KEY, "word,sum");
+            String query = "INSERT INTO " + KEYSPACE + "." + OUTPUT_COLUMN_FAMILY +
+                           " (row_id1, row_id2, word, count_num) " +
+                           " values (?, ?, ?, ?)";
+            CQLConfigHelper.setOutputCql(job.getConfiguration(), query);
+            ConfigHelper.setOutputInitialAddress(job.getConfiguration(), "localhost");
+            ConfigHelper.setOutputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
+        }
+
+        job.setInputFormatClass(ColumnFamilyInputFormat.class);
+
+        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
+        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
+        ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
+        ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
+
+        CQLConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+        //this is the user defined filter clauses, you can comment it out if you want count all titles
+        CQLConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
+        job.waitForCompletion(true);
+        return 0;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
new file mode 100644
index 0000000..1cf5539
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.apache.cassandra.hadoop.cql3.ColumnFamilyInputFormat;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CQLConfigHelper;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+
+/**
+ * This sums the word count stored in the input_words_count ColumnFamily for the key "sum".
+ *
+ * Output is written to a text file.
+ */
+public class WordCountCounters extends Configured implements Tool
+{
+    private static final Logger logger = LoggerFactory.getLogger(WordCountCounters.class);
+
+    static final String COUNTER_COLUMN_FAMILY = "input_words_count";
+    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count_counters";
+
+    public static void main(String[] args) throws Exception
+    {
+        // Let ToolRunner handle generic command-line options
+        ToolRunner.run(new Configuration(), new WordCountCounters(), args);
+        System.exit(0);
+    }
+
+    public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
+    {
+        long sum = -1;
+        public void map(Map<String, ByteBuffer> key, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
+        {   
+            if (sum < 0)
+                sum = 0;
+
+            logger.debug("read " + toString(key) + ":count_num from " + context.getInputSplit());
+            sum += Long.valueOf(ByteBufferUtil.string(columns.get("count_num")));
+        }
+
+        protected void cleanup(Context context) throws IOException, InterruptedException {
+            if (sum > 0)
+                context.write(new Text("total_count"), new LongWritable(sum));
+        }
+
+        private String toString(Map<String, ByteBuffer> keys)
+        {
+            String result = "";
+            try
+            {
+                for (ByteBuffer key : keys.values())
+                    result = result + ByteBufferUtil.string(key) + ":";
+            }
+            catch (CharacterCodingException e)
+            {
+                logger.error("Failed to print keys", e);
+            }
+            return result;
+        }
+    }
+
+    
+    public int run(String[] args) throws Exception
+    {
+        Job job = new Job(getConf(), "wordcountcounters");
+        job.setJarByClass(WordCountCounters.class);
+        job.setMapperClass(SumMapper.class);
+
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(LongWritable.class);
+        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
+
+        job.setInputFormatClass(ColumnFamilyInputFormat.class);
+
+        ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160");
+        ConfigHelper.setInputInitialAddress(job.getConfiguration(), "localhost");
+        ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
+        ConfigHelper.setInputColumnFamily(job.getConfiguration(), WordCount.KEYSPACE, WordCount.OUTPUT_COLUMN_FAMILY);
+
+        CQLConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
+
+        job.waitForCompletion(true);
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/examples/hadoop_cql3_word_count/src/WordCountSetup.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountSetup.java b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
new file mode 100644
index 0000000..0acb8f7
--- /dev/null
+++ b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WordCountSetup
+{
+    private static final Logger logger = LoggerFactory.getLogger(WordCountSetup.class);
+
+    public static final int TEST_COUNT = 6;
+
+    public static void main(String[] args) throws Exception
+    {
+        Cassandra.Iface client = createConnection();
+
+        setupKeyspace(client);
+        client.set_keyspace(WordCount.KEYSPACE);
+        setupTable(client);
+        insertData(client);
+
+        System.exit(0);
+    }
+
+    private static void setupKeyspace(Cassandra.Iface client)  
+            throws InvalidRequestException, 
+            UnavailableException, 
+            TimedOutException, 
+            SchemaDisagreementException, 
+            TException
+    {
+        KsDef ks;
+        try
+        {
+            ks = client.describe_keyspace(WordCount.KEYSPACE);
+        }
+        catch(NotFoundException e)
+        {
+            logger.info("set up keyspace " + WordCount.KEYSPACE);
+            String query = "CREATE KEYSPACE " + WordCount.KEYSPACE +
+                              " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1}"; 
+
+            client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+
+            int magnitude = client.describe_ring(WordCount.KEYSPACE).size();
+            try
+            {
+                Thread.sleep(1000 * magnitude);
+            }
+            catch (InterruptedException ie)
+            {
+                throw new RuntimeException(ie);
+            }
+        }
+    }
+
+    private static void setupTable(Cassandra.Iface client)  
+            throws InvalidRequestException, 
+            UnavailableException, 
+            TimedOutException, 
+            SchemaDisagreementException, 
+            TException
+    {
+        String query = "CREATE TABLE " + WordCount.KEYSPACE + "."  + WordCount.COLUMN_FAMILY + 
+                          " ( user_id text," +
+                          "   category_id text, " +
+                          "   sub_category_id text," +
+                          "   title  text," +
+                          "   body  text," +
+                          "   PRIMARY KEY (user_id, category_id, sub_category_id) ) ";
+
+        try
+        {
+            logger.info("set up table " + WordCount.COLUMN_FAMILY);
+            client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+        }
+        catch (InvalidRequestException e)
+        {
+            logger.error("failed to create table " + WordCount.KEYSPACE + "."  + WordCount.COLUMN_FAMILY, e);
+        }
+
+        query = "CREATE INDEX title on " + WordCount.COLUMN_FAMILY + "(title)";
+        try
+        {
+            logger.info("set up index on title column ");
+            client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+        }
+        catch (InvalidRequestException e)
+        {
+            logger.error("Failed to create index on title", e);
+        }
+
+        query = "CREATE TABLE " + WordCount.KEYSPACE + "."  + WordCount.OUTPUT_COLUMN_FAMILY + 
+                " ( row_id text," +
+                "   word text, " +
+                "   count_num text," +
+                "   PRIMARY KEY (row_id, word) ) ";
+
+        try
+        {
+            logger.info("set up table " + WordCount.OUTPUT_COLUMN_FAMILY);
+            client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
+        }
+        catch (InvalidRequestException e)
+        {
+            logger.error("failed to create table " + WordCount.KEYSPACE + "."  + WordCount.OUTPUT_COLUMN_FAMILY, e);
+        }
+    }
+    
+    private static Cassandra.Iface createConnection() throws TTransportException
+    {
+        if (System.getProperty("cassandra.host") == null || System.getProperty("cassandra.port") == null)
+        {
+            logger.warn("cassandra.host or cassandra.port is not defined, using default");
+        }
+        return createConnection(System.getProperty("cassandra.host", "localhost"),
+                                Integer.valueOf(System.getProperty("cassandra.port", "9160")));
+    }
+
+    private static Cassandra.Client createConnection(String host, Integer port) throws TTransportException
+    {
+        TSocket socket = new TSocket(host, port);
+        TTransport trans = new TFramedTransport(socket);
+        trans.open();
+        TProtocol protocol = new TBinaryProtocol(trans);
+
+        return new Cassandra.Client(protocol);
+    }
+
+    private static void insertData(Cassandra.Iface client) 
+            throws InvalidRequestException, 
+            UnavailableException, 
+            TimedOutException, 
+            SchemaDisagreementException, 
+            TException
+    {
+        String query = "INSERT INTO " + WordCount.COLUMN_FAMILY +  
+                           "(user_id, category_id, sub_category_id, title, body ) " +
+                           " values (?, ?, ?, ?, ?) ";
+        CqlPreparedResult result = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE);
+
+        String [] title = titleData();
+        String [] body = bodyData();
+        for (int i=1; i<5; i++)
+        {         
+            for (int j=1; j<444; j++) 
+            {
+                for (int k=1; k<4; k++)
+                {
+                    List<ByteBuffer> values = new ArrayList<ByteBuffer>();
+                    values.add(ByteBufferUtil.bytes(String.valueOf(j)));
+                    values.add(ByteBufferUtil.bytes(String.valueOf(i)));
+                    values.add(ByteBufferUtil.bytes(String.valueOf(k)));
+                    values.add(ByteBufferUtil.bytes(title[i]));
+                    values.add(ByteBufferUtil.bytes(body[i]));
+                    client.execute_prepared_cql3_query(result.itemId, values, ConsistencyLevel.ONE);
+                }
+            }
+        } 
+    }
+
+    private static String[] bodyData()
+    {   // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
+        return new String[]{
+                "",
+                "If you can keep your head when all about you",
+                "Are losing theirs and blaming it on you",
+                "If you can trust yourself when all men doubt you,",
+                "But make allowance for their doubting too:",
+                "If you can wait and not be tired by waiting,"
+        };
+    }
+
+    private static String[] titleData()
+    {   // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
+        return new String[]{
+                "",
+                "A",
+                "B",
+                "C",
+                "D",
+                "E"            
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
new file mode 100644
index 0000000..1c8fd0b
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyInputFormat.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.thrift.AuthenticationRequest;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CfSplit;
+import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.TokenRange;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.thrift.TApplicationException;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public abstract class AbstractColumnFamilyInputFormat<K, Y> extends InputFormat<K, Y> implements org.apache.hadoop.mapred.InputFormat<K, Y>
+{
+    private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyInputFormat.class);
+
+    public static final String MAPRED_TASK_ID = "mapred.task.id";
+    // The simple fact that we need this is because the old Hadoop API wants us to "write"
+    // to the key and value whereas the new asks for it.
+    // I choose 8kb as the default max key size (instanciated only once), but you can
+    // override it in your jobConf with this setting.
+    public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
+    public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
+
+    private String keyspace;
+    private String cfName;
+    private IPartitioner partitioner;
+
+    protected void validateConfiguration(Configuration conf)
+    {
+        if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setInputColumnFamily()");
+        }
+        if (ConfigHelper.getInputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
+    }
+
+    public static Cassandra.Client createAuthenticatedClient(String location, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF input format");
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(location, port, conf);
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+
+        // log in
+        client.set_keyspace(ConfigHelper.getInputKeyspace(conf));
+        if (ConfigHelper.getInputKeyspaceUserName(conf) != null)
+        {
+            Map<String, String> creds = new HashMap<String, String>();
+            creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getInputKeyspaceUserName(conf));
+            creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getInputKeyspacePassword(conf));
+            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+            client.login(authRequest);
+        }
+        logger.debug("Authenticated client for CF input format created successfully");
+        return client;
+    }
+
+    public List<InputSplit> getSplits(JobContext context) throws IOException
+    {
+        Configuration conf = context.getConfiguration();
+
+        validateConfiguration(conf);
+
+        // cannonical ranges and nodes holding replicas
+        List<TokenRange> masterRangeNodes = getRangeMap(conf);
+
+        keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
+        cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
+        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
+        logger.debug("partitioner is " + partitioner);
+
+        // cannonical ranges, split into pieces, fetching the splits in parallel
+        ExecutorService executor = Executors.newCachedThreadPool();
+        List<InputSplit> splits = new ArrayList<InputSplit>();
+
+        try
+        {
+            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
+            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
+            Range<Token> jobRange = null;
+            if (jobKeyRange != null)
+            {
+                if (jobKeyRange.start_key == null)
+                {
+                    logger.warn("ignoring jobKeyRange specified without start_key");
+                }
+                else
+                {
+                    if (!partitioner.preservesOrder())
+                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
+                    if (jobKeyRange.start_token != null)
+                        throw new IllegalArgumentException("only start_key supported");
+                    if (jobKeyRange.end_token != null)
+                        throw new IllegalArgumentException("only start_key supported");
+                    jobRange = new Range<Token>(partitioner.getToken(jobKeyRange.start_key),
+                                                partitioner.getToken(jobKeyRange.end_key),
+                                                partitioner);
+                }
+            }
+
+            for (TokenRange range : masterRangeNodes)
+            {
+                if (jobRange == null)
+                {
+                    // for each range, pick a live owner and ask it to compute bite-sized splits
+                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                }
+                else
+                {
+                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
+                                                             partitioner.getTokenFactory().fromString(range.end_token),
+                                                             partitioner);
+
+                    if (dhtRange.intersects(jobRange))
+                    {
+                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
+                        {
+                            range.start_token = partitioner.getTokenFactory().toString(intersection.left);
+                            range.end_token = partitioner.getTokenFactory().toString(intersection.right);
+                            // for each range, pick a live owner and ask it to compute bite-sized splits
+                            splitfutures.add(executor.submit(new SplitCallable(range, conf)));
+                        }
+                    }
+                }
+            }
+
+            // wait until we have all the results back
+            for (Future<List<InputSplit>> futureInputSplits : splitfutures)
+            {
+                try
+                {
+                    splits.addAll(futureInputSplits.get());
+                }
+                catch (Exception e)
+                {
+                    throw new IOException("Could not get input splits", e);
+                }
+            }
+        }
+        finally
+        {
+            executor.shutdownNow();
+        }
+
+        assert splits.size() > 0;
+        Collections.shuffle(splits, new Random(System.nanoTime()));
+        return splits;
+    }
+
+    /**
+     * Gets a token range and splits it up according to the suggested
+     * size into input splits that Hadoop can use.
+     */
+    class SplitCallable implements Callable<List<InputSplit>>
+    {
+
+        private final TokenRange range;
+        private final Configuration conf;
+
+        public SplitCallable(TokenRange tr, Configuration conf)
+        {
+            this.range = tr;
+            this.conf = conf;
+        }
+
+        public List<InputSplit> call() throws Exception
+        {
+            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
+            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
+            assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
+            // turn the sub-ranges into InputSplits
+            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
+            // hadoop needs hostname, not ip
+            int endpointIndex = 0;
+            for (String endpoint: range.rpc_endpoints)
+            {
+                String endpoint_address = endpoint;
+                if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
+                    endpoint_address = range.endpoints.get(endpointIndex);
+                endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
+            }
+
+            Token.TokenFactory factory = partitioner.getTokenFactory();
+            for (CfSplit subSplit : subSplits)
+            {
+                Token left = factory.fromString(subSplit.getStart_token());
+                Token right = factory.fromString(subSplit.getEnd_token());
+                Range<Token> range = new Range<Token>(left, right, partitioner);
+                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
+                for (Range<Token> subrange : ranges)
+                {
+                    ColumnFamilySplit split =
+                            new ColumnFamilySplit(
+                                    factory.toString(subrange.left),
+                                    factory.toString(subrange.right),
+                                    subSplit.getRow_count(),
+                                    endpoints);
+
+                    logger.debug("adding " + split);
+                    splits.add(split);
+                }
+            }
+            return splits;
+        }
+    }
+
+    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
+    {
+        int splitsize = ConfigHelper.getInputSplitSize(conf);
+        for (int i = 0; i < range.rpc_endpoints.size(); i++)
+        {
+            String host = range.rpc_endpoints.get(i);
+
+            if (host == null || host.equals("0.0.0.0"))
+                host = range.endpoints.get(i);
+
+            try
+            {
+                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
+                client.set_keyspace(keyspace);
+
+                try
+                {
+                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
+                }
+                catch (TApplicationException e)
+                {
+                    // fallback to guessing split size if talking to a server without describe_splits_ex method
+                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
+                    {
+                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
+                        return tokenListToSplits(splitPoints, splitsize);
+                    }
+                    throw e;
+                }
+            }
+            catch (IOException e)
+            {
+                logger.debug("failed connect to endpoint " + host, e);
+            }
+            catch (InvalidRequestException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (TException e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
+    }
+
+    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
+    {
+        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
+        for (int j = 0; j < splitTokens.size() - 1; j++)
+            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
+        return splits;
+    }
+
+    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
+    {
+        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
+
+        List<TokenRange> map;
+        try
+        {
+            map = client.describe_ring(ConfigHelper.getInputKeyspace(conf));
+        }
+        catch (InvalidRequestException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (TException e)
+        {
+            throw new RuntimeException(e);
+        }
+        return map;
+    }
+
+    //
+    // Old Hadoop API
+    //
+    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
+    {
+        TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
+        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
+        org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
+        for (int i = 0; i < newInputSplits.size(); i++)
+            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
+        return oldInputSplits;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
new file mode 100644
index 0000000..5a03777
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyOutputFormat.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.auth.IAuthenticator;
+import org.apache.cassandra.thrift.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TTransport;
+
+/**
+ * The <code>ColumnFamilyOutputFormat</code> acts as a Hadoop-specific
+ * OutputFormat that allows reduce tasks to store keys (and corresponding
+ * values) as Cassandra rows (and respective columns) in a given
+ * ColumnFamily.
+ *
+ * <p>
+ * As is the case with the {@link ColumnFamilyInputFormat}, you need to set the
+ * Keyspace and ColumnFamily in your
+ * Hadoop job Configuration. The {@link ConfigHelper} class, through its
+ * {@link ConfigHelper#setOutputColumnFamily} method, is provided to make this
+ * simple.
+ * </p>
+ *
+ * <p>
+ * For the sake of performance, this class employs a lazy write-back caching
+ * mechanism, where its record writer batches mutations created based on the
+ * reduce's inputs (in a task-specific map), and periodically makes the changes
+ * official by sending a batch mutate request to Cassandra.
+ * </p>
+ * @param <Y>
+ */
+public abstract class AbstractColumnFamilyOutputFormat<K, Y> extends OutputFormat<K, Y> implements org.apache.hadoop.mapred.OutputFormat<K, Y>
+{
+    public static final String BATCH_THRESHOLD = "mapreduce.output.columnfamilyoutputformat.batch.threshold";
+    public static final String QUEUE_SIZE = "mapreduce.output.columnfamilyoutputformat.queue.size";
+    private static final Logger logger = LoggerFactory.getLogger(AbstractColumnFamilyOutputFormat.class);
+
+
+    /**
+     * Check for validity of the output-specification for the job.
+     *
+     * @param context
+     *            information about the job
+     * @throws IOException
+     *             when output should not be attempted
+     */
+    public void checkOutputSpecs(JobContext context)
+    {
+        checkOutputSpecs(context.getConfiguration());
+    }
+
+    protected void checkOutputSpecs(Configuration conf)
+    {
+        if (ConfigHelper.getOutputKeyspace(conf) == null)
+            throw new UnsupportedOperationException("You must set the keyspace with setOutputKeyspace()");
+        if (ConfigHelper.getOutputPartitioner(conf) == null)
+            throw new UnsupportedOperationException("You must set the output partitioner to the one used by your Cassandra cluster");
+        if (ConfigHelper.getOutputInitialAddress(conf) == null)
+            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node");
+    }
+
+    /** Fills the deprecated OutputFormat interface for streaming. */
+    @Deprecated
+    public void checkOutputSpecs(org.apache.hadoop.fs.FileSystem filesystem, org.apache.hadoop.mapred.JobConf job) throws IOException
+    {
+        checkOutputSpecs(job);
+    }
+
+    /**
+     * The OutputCommitter for this format does not write any data to the DFS.
+     *
+     * @param context
+     *            the task context
+     * @return an output committer
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        return new NullOutputCommitter();
+    }
+
+    /**
+     * Connects to the given server:port and returns a client based on the given socket that points to the configured
+     * keyspace, and is logged in with the configured credentials.
+     *
+     * @param host fully qualified host name to connect to
+     * @param port RPC port of the server
+     * @param conf a job configuration
+     * @return a cassandra client
+     * @throws Exception set of thrown exceptions may be implementation defined,
+     *                   depending on the used transport factory
+     */
+    public static Cassandra.Client createAuthenticatedClient(String host, int port, Configuration conf) throws Exception
+    {
+        logger.debug("Creating authenticated client for CF output format");
+        TTransport transport = ConfigHelper.getClientTransportFactory(conf).openTransport(host, port, conf);
+        TProtocol binaryProtocol = new TBinaryProtocol(transport, true, true);
+        Cassandra.Client client = new Cassandra.Client(binaryProtocol);
+        client.set_keyspace(ConfigHelper.getOutputKeyspace(conf));
+        if (ConfigHelper.getOutputKeyspaceUserName(conf) != null)
+        {
+            Map<String, String> creds = new HashMap<String, String>();
+            creds.put(IAuthenticator.USERNAME_KEY, ConfigHelper.getOutputKeyspaceUserName(conf));
+            creds.put(IAuthenticator.PASSWORD_KEY, ConfigHelper.getOutputKeyspacePassword(conf));
+            AuthenticationRequest authRequest = new AuthenticationRequest(creds);
+            client.login(authRequest);
+        }
+        logger.debug("Authenticated client for CF output format created successfully");
+        return client;
+    }
+
+    /**
+     * An {@link OutputCommitter} that does nothing.
+     */
+    private static class NullOutputCommitter extends OutputCommitter
+    {
+        public void abortTask(TaskAttemptContext taskContext) { }
+
+        public void cleanupJob(JobContext jobContext) { }
+
+        public void commitTask(TaskAttemptContext taskContext) { }
+
+        public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        {
+            return false;
+        }
+
+        public void setupJob(JobContext jobContext) { }
+
+        public void setupTask(TaskAttemptContext taskContext) { }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
new file mode 100644
index 0000000..6428db3
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/AbstractColumnFamilyRecordWriter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.hadoop;
+
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.cassandra.client.RingCache;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.thrift.transport.TTransport;
+
+
+/**
+ * The <code>ColumnFamilyRecordWriter</code> maps the output &lt;key, value&gt;
+ * pairs to a Cassandra column family. In particular, it applies all mutations
+ * in the value, which it associates with the key, and in turn the responsible
+ * endpoint.
+ *
+ * <p>
+ * Furthermore, this writer groups the mutations by the endpoint responsible for
+ * the rows being affected. This allows the mutations to be executed in parallel,
+ * directly to a responsible endpoint.
+ * </p>
+ *
+ * @see ColumnFamilyOutputFormat
+ */
+public abstract class AbstractColumnFamilyRecordWriter<K, Y> extends RecordWriter<K, Y> implements org.apache.hadoop.mapred.RecordWriter<K, Y>
+{
+    // The configuration this writer is associated with.
+    protected final Configuration conf;
+
+    // The ring cache that describes the token ranges each node in the ring is
+    // responsible for. This is what allows us to group the mutations by
+    // the endpoints they should be targeted at. The targeted endpoint
+    // essentially
+    // acts as the primary replica for the rows being affected by the mutations.
+    protected final RingCache ringCache;
+
+    // The number of mutations to buffer per endpoint
+    protected final int queueSize;
+
+    protected final long batchThreshold;
+
+    protected final ConsistencyLevel consistencyLevel;
+    protected Progressable progressable;
+
+    protected AbstractColumnFamilyRecordWriter(Configuration conf)
+    {
+        this.conf = conf;
+        this.ringCache = new RingCache(conf);
+        this.queueSize = conf.getInt(AbstractColumnFamilyOutputFormat.QUEUE_SIZE, 32 * FBUtilities.getAvailableProcessors());
+        batchThreshold = conf.getLong(AbstractColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
+        consistencyLevel = ConsistencyLevel.valueOf(ConfigHelper.getWriteConsistencyLevel(conf));
+    }
+    
+    /**
+     * Close this <code>RecordWriter</code> to future operations, but not before
+     * flushing out the batched mutations.
+     *
+     * @param context the context of the task
+     * @throws IOException
+     */
+    public void close(TaskAttemptContext context) throws IOException, InterruptedException
+    {
+        close();
+    }
+
+    /** Fills the deprecated RecordWriter interface for streaming. */
+    @Deprecated
+    public void close(org.apache.hadoop.mapred.Reporter reporter) throws IOException
+    {
+        close();
+    }
+    
+    protected abstract void close() throws IOException;
+
+    /**
+     * A client that runs in a threadpool and connects to the list of endpoints for a particular
+     * range. Mutations for keys in that range are sent to this client via a queue.
+     */
+    public abstract class AbstractRangeClient<K> extends Thread
+    {
+        // The list of endpoints for this range
+        protected final List<InetAddress> endpoints;
+        // A bounded queue of incoming mutations for this range
+        protected final BlockingQueue<Pair<ByteBuffer, K>> queue = new ArrayBlockingQueue<Pair<ByteBuffer, K>>(queueSize);
+
+        protected volatile boolean run = true;
+        // we want the caller to know if something went wrong, so we record any unrecoverable exception while writing
+        // so we can throw it on the caller's stack when he calls put() again, or if there are no more put calls,
+        // when the client is closed.
+        protected volatile IOException lastException;
+
+        protected Cassandra.Client client;
+
+        /**
+         * Constructs an {@link AbstractRangeClient} for the given endpoints.
+         * @param endpoints the possible endpoints to execute the mutations on
+         */
+        public AbstractRangeClient(List<InetAddress> endpoints)
+        {
+            super("client-" + endpoints);
+            this.endpoints = endpoints;
+         }
+
+        /**
+         * enqueues the given value to Cassandra
+         */
+        public void put(Pair<ByteBuffer, K> value) throws IOException
+        {
+            while (true)
+            {
+                if (lastException != null)
+                    throw lastException;
+                try
+                {
+                    if (queue.offer(value, 100, TimeUnit.MILLISECONDS))
+                        break;
+                }
+                catch (InterruptedException e)
+                {
+                    throw new AssertionError(e);
+                }
+            }
+        }
+
+        public void close() throws IOException
+        {
+            // stop the run loop.  this will result in closeInternal being called by the time join() finishes.
+            run = false;
+            interrupt();
+            try
+            {
+                this.join();
+            }
+            catch (InterruptedException e)
+            {
+                throw new AssertionError(e);
+            }
+
+            if (lastException != null)
+                throw lastException;
+        }
+
+        protected void closeInternal()
+        {
+            if (client != null)
+            {
+                TTransport transport = client.getOutputProtocol().getTransport();
+                if (transport.isOpen())
+                    transport.close();
+            }
+        }
+
+        /**
+         * Loops collecting mutations from the queue and sending to Cassandra
+         */
+        public abstract void run();
+
+        @Override
+        public String toString()
+        {
+            return "#<Client for " + endpoints.toString() + ">";
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/90052d5a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
index 404ef60..1b5a4e2 100644
--- a/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java
@@ -18,35 +18,14 @@
 package org.apache.cassandra.hadoop;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Random;
-import java.util.SortedMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.TApplicationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.util.*;
 
 import org.apache.cassandra.db.Column;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.thrift.*;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.*;
-import org.apache.thrift.TException;
 
 /**
  * Hadoop InputFormat allowing map/reduce against Cassandra rows within one ColumnFamily.
@@ -65,283 +44,14 @@ import org.apache.thrift.TException;
  *
  * The default split size is 64k rows.
  */
-public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
-    implements org.apache.hadoop.mapred.InputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
+public class ColumnFamilyInputFormat extends AbstractColumnFamilyInputFormat<ByteBuffer, SortedMap<ByteBuffer, Column>>
 {
-    private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyInputFormat.class);
-
-    public static final String MAPRED_TASK_ID = "mapred.task.id";
-    // The simple fact that we need this is because the old Hadoop API wants us to "write"
-    // to the key and value whereas the new asks for it.
-    // I choose 8kb as the default max key size (instanciated only once), but you can
-    // override it in your jobConf with this setting.
-    public static final String CASSANDRA_HADOOP_MAX_KEY_SIZE = "cassandra.hadoop.max_key_size";
-    public static final int    CASSANDRA_HADOOP_MAX_KEY_SIZE_DEFAULT = 8192;
-
-    private String keyspace;
-    private String cfName;
-    private IPartitioner partitioner;
-
-    private static void validateConfiguration(Configuration conf)
-    {
-        if (ConfigHelper.getInputKeyspace(conf) == null || ConfigHelper.getInputColumnFamily(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the keyspace and columnfamily with setInputColumnFamily()");
-        }
-        if (ConfigHelper.getInputSlicePredicate(conf) == null)
-        {
-            throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate");
-        }
-        if (ConfigHelper.getInputInitialAddress(conf) == null)
-            throw new UnsupportedOperationException("You must set the initial output address to a Cassandra node with setInputInitialAddress");
-        if (ConfigHelper.getInputPartitioner(conf) == null)
-            throw new UnsupportedOperationException("You must set the Cassandra partitioner class with setInputPartitioner");
-    }
-
-    public List<InputSplit> getSplits(JobContext context) throws IOException
-    {
-        Configuration conf = context.getConfiguration();
-
-        validateConfiguration(conf);
-
-        // cannonical ranges and nodes holding replicas
-        List<TokenRange> masterRangeNodes = getRangeMap(conf);
-
-        keyspace = ConfigHelper.getInputKeyspace(context.getConfiguration());
-        cfName = ConfigHelper.getInputColumnFamily(context.getConfiguration());
-        partitioner = ConfigHelper.getInputPartitioner(context.getConfiguration());
-        logger.debug("partitioner is " + partitioner);
-
-        // cannonical ranges, split into pieces, fetching the splits in parallel
-        ExecutorService executor = Executors.newCachedThreadPool();
-        List<InputSplit> splits = new ArrayList<InputSplit>();
-
-        try
-        {
-            List<Future<List<InputSplit>>> splitfutures = new ArrayList<Future<List<InputSplit>>>();
-            KeyRange jobKeyRange = ConfigHelper.getInputKeyRange(conf);
-            Range<Token> jobRange = null;
-            if (jobKeyRange != null)
-            {
-                if (jobKeyRange.start_key == null)
-                {
-                    logger.warn("ignoring jobKeyRange specified without start_key");
-                }
-                else
-                {
-                    if (!partitioner.preservesOrder())
-                        throw new UnsupportedOperationException("KeyRange based on keys can only be used with a order preserving paritioner");
-                    if (jobKeyRange.start_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    if (jobKeyRange.end_token != null)
-                        throw new IllegalArgumentException("only start_key supported");
-                    jobRange = new Range<Token>(partitioner.getToken(jobKeyRange.start_key),
-                                                partitioner.getToken(jobKeyRange.end_key),
-                                                partitioner);
-                }
-            }
-
-            for (TokenRange range : masterRangeNodes)
-            {
-                if (jobRange == null)
-                {
-                    // for each range, pick a live owner and ask it to compute bite-sized splits
-                    splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                }
-                else
-                {
-                    Range<Token> dhtRange = new Range<Token>(partitioner.getTokenFactory().fromString(range.start_token),
-                                                             partitioner.getTokenFactory().fromString(range.end_token),
-                                                             partitioner);
-
-                    if (dhtRange.intersects(jobRange))
-                    {
-                        for (Range<Token> intersection: dhtRange.intersectionWith(jobRange))
-                        {
-                            range.start_token = partitioner.getTokenFactory().toString(intersection.left);
-                            range.end_token = partitioner.getTokenFactory().toString(intersection.right);
-                            // for each range, pick a live owner and ask it to compute bite-sized splits
-                            splitfutures.add(executor.submit(new SplitCallable(range, conf)));
-                        }
-                    }
-                }
-            }
-
-            // wait until we have all the results back
-            for (Future<List<InputSplit>> futureInputSplits : splitfutures)
-            {
-                try
-                {
-                    splits.addAll(futureInputSplits.get());
-                }
-                catch (Exception e)
-                {
-                    throw new IOException("Could not get input splits", e);
-                }
-            }
-        }
-        finally
-        {
-            executor.shutdownNow();
-        }
-
-        assert splits.size() > 0;
-        Collections.shuffle(splits, new Random(System.nanoTime()));
-        return splits;
-    }
-
-    /**
-     * Gets a token range and splits it up according to the suggested
-     * size into input splits that Hadoop can use.
-     */
-    class SplitCallable implements Callable<List<InputSplit>>
-    {
-
-        private final TokenRange range;
-        private final Configuration conf;
-
-        public SplitCallable(TokenRange tr, Configuration conf)
-        {
-            this.range = tr;
-            this.conf = conf;
-        }
-
-        public List<InputSplit> call() throws Exception
-        {
-            ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
-            List<CfSplit> subSplits = getSubSplits(keyspace, cfName, range, conf);
-            assert range.rpc_endpoints.size() == range.endpoints.size() : "rpc_endpoints size must match endpoints size";
-            // turn the sub-ranges into InputSplits
-            String[] endpoints = range.endpoints.toArray(new String[range.endpoints.size()]);
-            // hadoop needs hostname, not ip
-            int endpointIndex = 0;
-            for (String endpoint: range.rpc_endpoints)
-            {
-                String endpoint_address = endpoint;
-                if (endpoint_address == null || endpoint_address.equals("0.0.0.0"))
-                    endpoint_address = range.endpoints.get(endpointIndex);
-                endpoints[endpointIndex++] = InetAddress.getByName(endpoint_address).getHostName();
-            }
-
-            Token.TokenFactory factory = partitioner.getTokenFactory();
-            for (CfSplit subSplit : subSplits)
-            {
-                Token left = factory.fromString(subSplit.getStart_token());
-                Token right = factory.fromString(subSplit.getEnd_token());
-                Range<Token> range = new Range<Token>(left, right, partitioner);
-                List<Range<Token>> ranges = range.isWrapAround() ? range.unwrap() : ImmutableList.of(range);
-                for (Range<Token> subrange : ranges)
-                {
-                    ColumnFamilySplit split =
-                            new ColumnFamilySplit(
-                                    factory.toString(subrange.left),
-                                    factory.toString(subrange.right),
-                                    subSplit.getRow_count(),
-                                    endpoints);
-
-                    logger.debug("adding " + split);
-                    splits.add(split);
-                }
-            }
-            return splits;
-        }
-    }
-
-    private List<CfSplit> getSubSplits(String keyspace, String cfName, TokenRange range, Configuration conf) throws IOException
-    {
-        int splitsize = ConfigHelper.getInputSplitSize(conf);
-        for (int i = 0; i < range.rpc_endpoints.size(); i++)
-        {
-            String host = range.rpc_endpoints.get(i);
-
-            if (host == null || host.equals("0.0.0.0"))
-                host = range.endpoints.get(i);
-
-            try
-            {
-                Cassandra.Client client = ConfigHelper.createConnection(conf, host, ConfigHelper.getInputRpcPort(conf));
-                client.set_keyspace(keyspace);
-
-                try
-                {
-                    return client.describe_splits_ex(cfName, range.start_token, range.end_token, splitsize);
-                }
-                catch (TApplicationException e)
-                {
-                    // fallback to guessing split size if talking to a server without describe_splits_ex method
-                    if (e.getType() == TApplicationException.UNKNOWN_METHOD)
-                    {
-                        List<String> splitPoints = client.describe_splits(cfName, range.start_token, range.end_token, splitsize);
-                        return tokenListToSplits(splitPoints, splitsize);
-                    }
-                    throw e;
-                }
-            }
-            catch (IOException e)
-            {
-                logger.debug("failed connect to endpoint " + host, e);
-            }
-            catch (InvalidRequestException e)
-            {
-                throw new RuntimeException(e);
-            }
-            catch (TException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-        throw new IOException("failed connecting to all endpoints " + StringUtils.join(range.endpoints, ","));
-    }
-
-
-    private List<CfSplit> tokenListToSplits(List<String> splitTokens, int splitsize)
-    {
-        List<CfSplit> splits = Lists.newArrayListWithExpectedSize(splitTokens.size() - 1);
-        for (int j = 0; j < splitTokens.size() - 1; j++)
-            splits.add(new CfSplit(splitTokens.get(j), splitTokens.get(j + 1), splitsize));
-        return splits;
-    }
-
-
-    private List<TokenRange> getRangeMap(Configuration conf) throws IOException
-    {
-        Cassandra.Client client = ConfigHelper.getClientFromInputAddressList(conf);
-
-        List<TokenRange> map;
-        try
-        {
-            map = client.describe_ring(ConfigHelper.getInputKeyspace(conf));
-        }
-        catch (InvalidRequestException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (TException e)
-        {
-            throw new RuntimeException(e);
-        }
-        return map;
-    }
-
+    
     public RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException
     {
         return new ColumnFamilyRecordReader();
     }
 
-
-    //
-    // Old Hadoop API
-    //
-    public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException
-    {
-        TaskAttemptContext tac = new TaskAttemptContext(jobConf, new TaskAttemptID());
-        List<org.apache.hadoop.mapreduce.InputSplit> newInputSplits = this.getSplits(tac);
-        org.apache.hadoop.mapred.InputSplit[] oldInputSplits = new org.apache.hadoop.mapred.InputSplit[newInputSplits.size()];
-        for (int i = 0; i < newInputSplits.size(); i++)
-            oldInputSplits[i] = (ColumnFamilySplit)newInputSplits.get(i);
-        return oldInputSplits;
-    }
-
     public org.apache.hadoop.mapred.RecordReader<ByteBuffer, SortedMap<ByteBuffer, Column>> getRecordReader(org.apache.hadoop.mapred.InputSplit split, JobConf jobConf, final Reporter reporter) throws IOException
     {
         TaskAttemptContext tac = new TaskAttemptContext(jobConf, TaskAttemptID.forName(jobConf.get(MAPRED_TASK_ID)))
@@ -357,5 +67,16 @@ public class ColumnFamilyInputFormat extends InputFormat<ByteBuffer, SortedMap<B
         recordReader.initialize((org.apache.hadoop.mapreduce.InputSplit)split, tac);
         return recordReader;
     }
+    
+    @Override
+    protected void validateConfiguration(Configuration conf)
+    {
+        super.validateConfiguration(conf);
+        
+        if (ConfigHelper.getInputSlicePredicate(conf) == null)
+        {
+            throw new UnsupportedOperationException("you must set the predicate with setInputSlicePredicate");
+        }
+    }
 
 }