You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2013/05/15 17:22:29 UTC
svn commit: r1482896 -
/accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
Author: ecn
Date: Wed May 15 15:22:29 2013
New Revision: 1482896
URL: http://svn.apache.org/r1482896
Log:
ACCUMULO-1417 ngram ingester
Added:
accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java (with props)
Added: accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java?rev=1482896&view=auto
==============================================================================
--- accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java (added)
+++ accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java Wed May 15 15:22:29 2013
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.mapreduce;
+
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.accumulo.core.cli.ClientOnRequiredTable;
+import org.apache.accumulo.core.client.mapreduce.AccumuloOutputFormat;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.util.CachedConfiguration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Logger;
+
+import com.beust.jcommander.Parameter;
+
+/**
+ * Map job to ingest n-gram files from
+ * http://storage.googleapis.com/books/ngrams/books/datasetsv2.html
+ */
+public class NGramIngest extends Configured implements Tool {
+
+ private static final Logger log = Logger.getLogger(NGramIngest.class);
+
+
+ static class Opts extends ClientOnRequiredTable {
+ @Parameter(names = "--input", required=true)
+ String inputDirectory;
+ }
+ static class NGramMapper extends Mapper<LongWritable, Text, Text, Mutation> {
+
+ @Override
+ protected void map(LongWritable location, Text value, Context context) throws IOException, InterruptedException {
+ String parts[] = value.toString().split("\\t");
+ if (parts.length >= 4) {
+ Mutation m = new Mutation(parts[0]);
+ m.put(parts[1], String.format("%010d", Long.parseLong(parts[2])), new Value(parts[3].trim().getBytes()));
+ context.write(null, m);
+ }
+ }
+ }
+
+ /**
+ * @param args
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ Opts opts = new Opts();
+ opts.parseArgs(getClass().getName(), args);
+
+ Job job = new Job(getConf(), getClass().getSimpleName());
+ job.setJarByClass(getClass());
+
+ opts.setAccumuloConfigs(job);
+ job.setInputFormatClass(TextInputFormat.class);
+ job.setOutputFormatClass(AccumuloOutputFormat.class);
+
+ job.setMapperClass(NGramMapper.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Mutation.class);
+
+ job.setNumReduceTasks(0);
+ job.setSpeculativeExecution(false);
+
+
+ if (!opts.getConnector().tableOperations().exists(opts.tableName)) {
+ log.info("Creating table " + opts.tableName);
+ opts.getConnector().tableOperations().create(opts.tableName);
+ SortedSet<Text> splits = new TreeSet<Text>();
+ String numbers[] = "1 2 3 4 5 6 7 8 9".split("\\s");
+ String lower[] = "a b c d e f g h i j k l m n o p q r s t u v w x y z".split("\\s");
+ String upper[] = "A B C D E F G H I J K L M N O P Q R S T U V W X Y Z".split("\\s");
+ for (String[] array : new String[][]{numbers, lower, upper}) {
+ for (String s : array) {
+ splits.add(new Text(s));
+ }
+ }
+ opts.getConnector().tableOperations().addSplits(opts.tableName, splits);
+ }
+
+ TextInputFormat.addInputPath(job, new Path(opts.inputDirectory));
+ job.waitForCompletion(true);
+ return job.isSuccessful() ? 0 : 1;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int res = ToolRunner.run(CachedConfiguration.getInstance(), new NGramIngest(), args);
+ if (res != 0)
+ System.exit(res);
+ }
+
+}
Propchange: accumulo/branches/1.5/examples/simple/src/main/java/org/apache/accumulo/examples/simple/mapreduce/NGramIngest.java
------------------------------------------------------------------------------
svn:eol-style = native