You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/03/17 22:36:27 UTC

[3/6] git commit: Update hadoop_cql3_word_count example patch by Chandar Pechetty; reviewed by Alex Liu for CASSANDRA-6703

Update hadoop_cql3_word_count example
patch by Chandar Pechetty; reviewed by Alex Liu for CASSANDRA-6703


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4f2ff17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4f2ff17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4f2ff17

Branch: refs/heads/trunk
Commit: b4f2ff17ad59e80c173f95f5f73419b989108c65
Parents: 98ed6a4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 17 16:34:23 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 17 16:34:23 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../hadoop_cql3_word_count/src/WordCount.java   | 44 +++-------------
 .../src/WordCountCounters.java                  | 16 ++++++
 .../src/WordCountSetup.java                     | 55 ++++----------------
 4 files changed, 35 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 688a759..040af7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.7
+ * Update hadoop_cql3_word_count example (CASSANDRA-6793)
  * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
  * Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865)
  * Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index c92f047..bc81a53 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -45,21 +45,16 @@ import java.nio.charset.CharacterCodingException;
 
 /**
  * This counts the occurrences of words in ColumnFamily
- *   cql3_worldcount ( user_id text,
- *                   category_id text,
- *                   sub_category_id text,
- *                   title  text,
- *                   body  text,
- *                   PRIMARY KEY (user_id, category_id, sub_category_id))
+ *   cql3_worldcount ( id uuid,
+ *                   line  text,
+ *                   PRIMARY KEY (id))
  *
  * For each word, we output the total number of occurrences across all body texts.
  *
  * When outputting to Cassandra, we write the word counts to column family
- *  output_words ( row_id1 text,
- *                 row_id2 text,
- *                 word text,
+ *  output_words ( word text,
  *                 count_num text,
- *                 PRIMARY KEY ((row_id1, row_id2), word))
+ *                 PRIMARY KEY (word))
  * as a {word, count} to columns: word, count_num with a row key of "word sum"
  */
 public class WordCount extends Configured implements Tool
@@ -98,14 +93,11 @@ public class WordCount extends Configured implements Tool
         {
             for (Entry<String, ByteBuffer> column : columns.entrySet())
             {
-                if (!"body".equalsIgnoreCase(column.getKey()))
+                if (!"line".equalsIgnoreCase(column.getKey()))
                     continue;
 
                 String value = ByteBufferUtil.string(column.getValue());
 
-                logger.debug("read {}:{}={} from {}",
-                             new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});
-
                 StringTokenizer itr = new StringTokenizer(value);
                 while (itr.hasMoreTokens())
                 {
@@ -114,21 +106,6 @@ public class WordCount extends Configured implements Tool
                 }
             }
         }
-
-        private String toString(Map<String, ByteBuffer> keys)
-        {
-            String result = "";
-            try
-            {
-                for (ByteBuffer key : keys.values())
-                    result = result + ByteBufferUtil.string(key) + ":";
-            }
-            catch (CharacterCodingException e)
-            {
-                logger.error("Failed to print keys", e);
-            }
-            return result;
-        }
     }
 
     public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
@@ -150,9 +127,6 @@ public class WordCount extends Configured implements Tool
         throws IOException, InterruptedException
         {
             keys = new LinkedHashMap<String, ByteBuffer>();
-            String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
-            keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
-            keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
         }
 
         public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
@@ -160,13 +134,13 @@ public class WordCount extends Configured implements Tool
             int sum = 0;
             for (IntWritable val : values)
                 sum += val.get();
+            keys.put("word", ByteBufferUtil.bytes(word.toString()));
             context.write(keys, getBindVariables(word, sum));
         }
 
         private List<ByteBuffer> getBindVariables(Text word, int sum)
         {
             List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
-            keys.put("word", ByteBufferUtil.bytes(word.toString()));
             variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));         
             return variables;
         }
@@ -223,9 +197,7 @@ public class WordCount extends Configured implements Tool
         ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
 
         CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
-        //this is the user defined filter clauses, you can comment it out if you want count all titles
-        CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
         job.waitForCompletion(true);
         return 0;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 8454b70..542a473 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -63,6 +64,7 @@ public class WordCountCounters extends Configured implements Tool
     public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
     {
         long sum = -1;
+
         public void map(Map<String, ByteBuffer> key, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
         {   
             if (sum < 0)
@@ -94,12 +96,26 @@ public class WordCountCounters extends Configured implements Tool
     }
 
     
+    public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable>
+    {
+        long sum = 0;
+
+        public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
+        {
+            for (LongWritable val : values)
+                sum += val.get();
+            context.write(key, new LongWritable(sum));
+        }
+    }
+
     public int run(String[] args) throws Exception
     {
         Job job = new Job(getConf(), "wordcountcounters");
         job.setJarByClass(WordCountCounters.class);
         job.setMapperClass(SumMapper.class);
 
+        job.setCombinerClass(ReducerToFilesystem.class);
+        job.setReducerClass(ReducerToFilesystem.class);
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(LongWritable.class);
         FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountSetup.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountSetup.java b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
index 0acb8f7..ebf7485 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountSetup.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
@@ -90,12 +90,9 @@ public class WordCountSetup
             TException
     {
         String query = "CREATE TABLE " + WordCount.KEYSPACE + "."  + WordCount.COLUMN_FAMILY + 
-                          " ( user_id text," +
-                          "   category_id text, " +
-                          "   sub_category_id text," +
-                          "   title  text," +
-                          "   body  text," +
-                          "   PRIMARY KEY (user_id, category_id, sub_category_id) ) ";
+                          " ( id uuid," +
+                          "   line text, " +
+                          "   PRIMARY KEY (id) ) ";
 
         try
         {
@@ -107,22 +104,10 @@ public class WordCountSetup
             logger.error("failed to create table " + WordCount.KEYSPACE + "."  + WordCount.COLUMN_FAMILY, e);
         }
 
-        query = "CREATE INDEX title on " + WordCount.COLUMN_FAMILY + "(title)";
-        try
-        {
-            logger.info("set up index on title column ");
-            client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
-        }
-        catch (InvalidRequestException e)
-        {
-            logger.error("Failed to create index on title", e);
-        }
-
         query = "CREATE TABLE " + WordCount.KEYSPACE + "."  + WordCount.OUTPUT_COLUMN_FAMILY + 
-                " ( row_id text," +
-                "   word text, " +
+                " ( word text," +
                 "   count_num text," +
-                "   PRIMARY KEY (row_id, word) ) ";
+                "   PRIMARY KEY (word) ) ";
 
         try
         {
@@ -163,26 +148,19 @@ public class WordCountSetup
             TException
     {
         String query = "INSERT INTO " + WordCount.COLUMN_FAMILY +  
-                           "(user_id, category_id, sub_category_id, title, body ) " +
-                           " values (?, ?, ?, ?, ?) ";
+                           "(id, line) " +
+                           " values (?, ?) ";
         CqlPreparedResult result = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE);
 
-        String [] title = titleData();
         String [] body = bodyData();
-        for (int i=1; i<5; i++)
+        for (int i = 0; i < 5; i++)
         {         
-            for (int j=1; j<444; j++) 
+            for (int j = 1; j <= 200; j++)
             {
-                for (int k=1; k<4; k++)
-                {
                     List<ByteBuffer> values = new ArrayList<ByteBuffer>();
-                    values.add(ByteBufferUtil.bytes(String.valueOf(j)));
-                    values.add(ByteBufferUtil.bytes(String.valueOf(i)));
-                    values.add(ByteBufferUtil.bytes(String.valueOf(k)));
-                    values.add(ByteBufferUtil.bytes(title[i]));
+                    values.add(ByteBufferUtil.bytes(UUID.randomUUID()));
                     values.add(ByteBufferUtil.bytes(body[i]));
                     client.execute_prepared_cql3_query(result.itemId, values, ConsistencyLevel.ONE);
-                }
             }
         } 
     }
@@ -190,7 +168,6 @@ public class WordCountSetup
     private static String[] bodyData()
     {   // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
         return new String[]{
-                "",
                 "If you can keep your head when all about you",
                 "Are losing theirs and blaming it on you",
                 "If you can trust yourself when all men doubt you,",
@@ -198,16 +175,4 @@ public class WordCountSetup
                 "If you can wait and not be tired by waiting,"
         };
     }
-
-    private static String[] titleData()
-    {   // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
-        return new String[]{
-                "",
-                "A",
-                "B",
-                "C",
-                "D",
-                "E"            
-        };
-    }
 }