You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2014/03/17 22:36:27 UTC
[3/6] git commit: Update hadoop_cql3_word_count example patch by
Chandar Pechetty; reviewed by Alex Liu for CASSANDRA-6703
Update hadoop_cql3_word_count example
patch by Chandar Pechetty; reviewed by Alex Liu for CASSANDRA-6703
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b4f2ff17
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b4f2ff17
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b4f2ff17
Branch: refs/heads/trunk
Commit: b4f2ff17ad59e80c173f95f5f73419b989108c65
Parents: 98ed6a4
Author: Jonathan Ellis <jb...@apache.org>
Authored: Mon Mar 17 16:34:23 2014 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Mon Mar 17 16:34:23 2014 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../hadoop_cql3_word_count/src/WordCount.java | 44 +++-------------
.../src/WordCountCounters.java | 16 ++++++
.../src/WordCountSetup.java | 55 ++++----------------
4 files changed, 35 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 688a759..040af7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.7
+ * Update hadoop_cql3_word_count example (CASSANDRA-6793)
* Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788)
* Log more information when exceeding tombstone_warn_threshold (CASSANDRA-6865)
* Fix truncate to not abort due to unreachable fat clients (CASSANDRA-6864)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCount.java b/examples/hadoop_cql3_word_count/src/WordCount.java
index c92f047..bc81a53 100644
--- a/examples/hadoop_cql3_word_count/src/WordCount.java
+++ b/examples/hadoop_cql3_word_count/src/WordCount.java
@@ -45,21 +45,16 @@ import java.nio.charset.CharacterCodingException;
/**
* This counts the occurrences of words in ColumnFamily
- * cql3_worldcount ( user_id text,
- * category_id text,
- * sub_category_id text,
- * title text,
- * body text,
- * PRIMARY KEY (user_id, category_id, sub_category_id))
+ * cql3_worldcount ( id uuid,
+ * line text,
+ * PRIMARY KEY (id))
*
* For each word, we output the total number of occurrences across all body texts.
*
* When outputting to Cassandra, we write the word counts to column family
- * output_words ( row_id1 text,
- * row_id2 text,
- * word text,
+ * output_words ( word text,
* count_num text,
- * PRIMARY KEY ((row_id1, row_id2), word))
+ * PRIMARY KEY (word))
* as a {word, count} to columns: word, count_num with a row key of "word sum"
*/
public class WordCount extends Configured implements Tool
@@ -98,14 +93,11 @@ public class WordCount extends Configured implements Tool
{
for (Entry<String, ByteBuffer> column : columns.entrySet())
{
- if (!"body".equalsIgnoreCase(column.getKey()))
+ if (!"line".equalsIgnoreCase(column.getKey()))
continue;
String value = ByteBufferUtil.string(column.getValue());
- logger.debug("read {}:{}={} from {}",
- new Object[] {toString(keys), column.getKey(), value, context.getInputSplit()});
-
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens())
{
@@ -114,21 +106,6 @@ public class WordCount extends Configured implements Tool
}
}
}
-
- private String toString(Map<String, ByteBuffer> keys)
- {
- String result = "";
- try
- {
- for (ByteBuffer key : keys.values())
- result = result + ByteBufferUtil.string(key) + ":";
- }
- catch (CharacterCodingException e)
- {
- logger.error("Failed to print keys", e);
- }
- return result;
- }
}
public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
@@ -150,9 +127,6 @@ public class WordCount extends Configured implements Tool
throws IOException, InterruptedException
{
keys = new LinkedHashMap<String, ByteBuffer>();
- String[] partitionKeys = context.getConfiguration().get(PRIMARY_KEY).split(",");
- keys.put("row_id1", ByteBufferUtil.bytes(partitionKeys[0]));
- keys.put("row_id2", ByteBufferUtil.bytes(partitionKeys[1]));
}
public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
@@ -160,13 +134,13 @@ public class WordCount extends Configured implements Tool
int sum = 0;
for (IntWritable val : values)
sum += val.get();
+ keys.put("word", ByteBufferUtil.bytes(word.toString()));
context.write(keys, getBindVariables(word, sum));
}
private List<ByteBuffer> getBindVariables(Text word, int sum)
{
List<ByteBuffer> variables = new ArrayList<ByteBuffer>();
- keys.put("word", ByteBufferUtil.bytes(word.toString()));
variables.add(ByteBufferUtil.bytes(String.valueOf(sum)));
return variables;
}
@@ -223,9 +197,7 @@ public class WordCount extends Configured implements Tool
ConfigHelper.setInputPartitioner(job.getConfiguration(), "Murmur3Partitioner");
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), "3");
- //this is the user defined filter clauses, you can comment it out if you want count all titles
- CqlConfigHelper.setInputWhereClauses(job.getConfiguration(), "title='A'");
job.waitForCompletion(true);
return 0;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountCounters.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountCounters.java b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
index 8454b70..542a473 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountCounters.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountCounters.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -63,6 +64,7 @@ public class WordCountCounters extends Configured implements Tool
public static class SumMapper extends Mapper<Map<String, ByteBuffer>, Map<String, ByteBuffer>, Text, LongWritable>
{
long sum = -1;
+
public void map(Map<String, ByteBuffer> key, Map<String, ByteBuffer> columns, Context context) throws IOException, InterruptedException
{
if (sum < 0)
@@ -94,12 +96,26 @@ public class WordCountCounters extends Configured implements Tool
}
+ public static class ReducerToFilesystem extends Reducer<Text, LongWritable, Text, LongWritable>
+ {
+ long sum = 0;
+
+ public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException
+ {
+ for (LongWritable val : values)
+ sum += val.get();
+ context.write(key, new LongWritable(sum));
+ }
+ }
+
public int run(String[] args) throws Exception
{
Job job = new Job(getConf(), "wordcountcounters");
job.setJarByClass(WordCountCounters.class);
job.setMapperClass(SumMapper.class);
+ job.setCombinerClass(ReducerToFilesystem.class);
+ job.setReducerClass(ReducerToFilesystem.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b4f2ff17/examples/hadoop_cql3_word_count/src/WordCountSetup.java
----------------------------------------------------------------------
diff --git a/examples/hadoop_cql3_word_count/src/WordCountSetup.java b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
index 0acb8f7..ebf7485 100644
--- a/examples/hadoop_cql3_word_count/src/WordCountSetup.java
+++ b/examples/hadoop_cql3_word_count/src/WordCountSetup.java
@@ -90,12 +90,9 @@ public class WordCountSetup
TException
{
String query = "CREATE TABLE " + WordCount.KEYSPACE + "." + WordCount.COLUMN_FAMILY +
- " ( user_id text," +
- " category_id text, " +
- " sub_category_id text," +
- " title text," +
- " body text," +
- " PRIMARY KEY (user_id, category_id, sub_category_id) ) ";
+ " ( id uuid," +
+ " line text, " +
+ " PRIMARY KEY (id) ) ";
try
{
@@ -107,22 +104,10 @@ public class WordCountSetup
logger.error("failed to create table " + WordCount.KEYSPACE + "." + WordCount.COLUMN_FAMILY, e);
}
- query = "CREATE INDEX title on " + WordCount.COLUMN_FAMILY + "(title)";
- try
- {
- logger.info("set up index on title column ");
- client.execute_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE, ConsistencyLevel.ONE);
- }
- catch (InvalidRequestException e)
- {
- logger.error("Failed to create index on title", e);
- }
-
query = "CREATE TABLE " + WordCount.KEYSPACE + "." + WordCount.OUTPUT_COLUMN_FAMILY +
- " ( row_id text," +
- " word text, " +
+ " ( word text," +
" count_num text," +
- " PRIMARY KEY (row_id, word) ) ";
+ " PRIMARY KEY (word) ) ";
try
{
@@ -163,26 +148,19 @@ public class WordCountSetup
TException
{
String query = "INSERT INTO " + WordCount.COLUMN_FAMILY +
- "(user_id, category_id, sub_category_id, title, body ) " +
- " values (?, ?, ?, ?, ?) ";
+ "(id, line) " +
+ " values (?, ?) ";
CqlPreparedResult result = client.prepare_cql3_query(ByteBufferUtil.bytes(query), Compression.NONE);
- String [] title = titleData();
String [] body = bodyData();
- for (int i=1; i<5; i++)
+ for (int i = 0; i < 5; i++)
{
- for (int j=1; j<444; j++)
+ for (int j = 1; j <= 200; j++)
{
- for (int k=1; k<4; k++)
- {
List<ByteBuffer> values = new ArrayList<ByteBuffer>();
- values.add(ByteBufferUtil.bytes(String.valueOf(j)));
- values.add(ByteBufferUtil.bytes(String.valueOf(i)));
- values.add(ByteBufferUtil.bytes(String.valueOf(k)));
- values.add(ByteBufferUtil.bytes(title[i]));
+ values.add(ByteBufferUtil.bytes(UUID.randomUUID()));
values.add(ByteBufferUtil.bytes(body[i]));
client.execute_prepared_cql3_query(result.itemId, values, ConsistencyLevel.ONE);
- }
}
}
}
@@ -190,7 +168,6 @@ public class WordCountSetup
private static String[] bodyData()
{ // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
return new String[]{
- "",
"If you can keep your head when all about you",
"Are losing theirs and blaming it on you",
"If you can trust yourself when all men doubt you,",
@@ -198,16 +175,4 @@ public class WordCountSetup
"If you can wait and not be tired by waiting,"
};
}
-
- private static String[] titleData()
- { // Public domain context, source http://en.wikisource.org/wiki/If%E2%80%94
- return new String[]{
- "",
- "A",
- "B",
- "C",
- "D",
- "E"
- };
- }
}