You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-user@hadoop.apache.org by MB Leasing <le...@marketingbrokers.com> on 2008/03/16 01:52:28 UTC

Lucene reduce

Hello,

For those interested, you can filter and
search Lucene documents in the reduce.

code:

import java.io.*;
import java.util.*;

import org.apache.lucene.index.Term;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.search.Searcher;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Hits;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.*;

public class ql
    {
    /**************************************
    * Query Lucene using keys.
    *
    * input:
    * java^this page is about java
    * ruby^site only mentions rails
    * php^another resource about php
    * java^ejb3 discussed and spring
    * eof^eof
    *
    * make docs,search,mapreduce
    *
    * output:
    * php^topic^another resource about php
    * java^topic^this page is about java
    ***************************************/

    public static class M extends MapReduceBase implements Mapper
       {
       HashMap hm=new HashMap();
       Map group_m=Collections.synchronizedMap(hm);
       String ITEM_KEY,BATCH_KEY="";int batch=0;

       public void map(WritableComparable wc,Writable w,
          OutputCollector out,Reporter rep)throws IOException
          {
          String ln=((Text)w).toString();
          String[] parse_a=ln.split("\\^");

          if(batch>(100-1)) // new lucene document group
             {out.collect(new Text(BATCH_KEY),new BytesWritable(ob 
(group_m)));
             BATCH_KEY="BATCH_"+key_maker(String.valueOf 
(batch));batch=0;group_m.clear();}
          else if(parse_a[0].equals("eof"))out.collect(new Text 
(BATCH_KEY),new BytesWritable(ob(group_m)));
          else ;

	 ITEM_KEY="ITEM_"+key_maker(parse_a[0]);
	 Document single_d=make_lucene_doc(parse_a[0],parse_a[1],ITEM_KEY);
	 group_m.put(ITEM_KEY,single_d);
	 batch++;
          }
       }

    public static class R extends MapReduceBase implements Reducer
       {
       public void reduce(WritableComparable wc,Iterator it,
          OutputCollector out,Reporter rep)throws IOException
          {
          while(it.hasNext())
             {
	    try
	       {
	       Map m=(Map)bo(((BytesWritable)it.next()).get());
                if(m instanceof Map)
                   {
                   try
                      {
	             // build temp index
	             Directory rd=new RAMDirectory();
                      Analyzer sa=new StandardAnalyzer();
                      IndexWriter iw=new IndexWriter(rd,sa,true);

	             // unwrap,cast,send to mem
                      List keys=new ArrayList(m.keySet());
                      Iterator itr_u=keys.iterator();
                      while(itr_u.hasNext())
                         {
                         Object k_u=itr_u.next();
                         Document dtmp=(Document)m.get(k_u);
                         iw.addDocument(dtmp);
                         }

                      iw.optimize();iw.close();
                      Searcher is=new IndexSearcher(rd);

	             // simple doc filter
                      Iterator itr_s=keys.iterator();
                      while(itr_s.hasNext())
                         {
                         Object k_s=itr_s.next();
	                String tmp_topic=k_s.toString();
	                TermQuery tq_i=new TermQuery(new Term 
("item",tmp_topic.trim()));

	                // query term from key
	                tmp_topic=tmp_topic.substring(tmp_topic.lastIndexOf 
("_")+1,tmp_topic.length());
	                TermQuery tq_b=new TermQuery(new Term 
("body",tmp_topic));

	                // search topic with inventory key
                         BooleanQuery bq=new BooleanQuery();
                         bq.add(tq_i,BooleanClause.Occur.MUST);
                         bq.add(tq_b,BooleanClause.Occur.MUST);

                         Hits h=is.search(bq);
                         for(int j=0;j<h.length();j++)
                            {
                            Document doc=h.doc(j);
                            String tmp_tpc=doc.get("topic");
                            String tmp_bdy=doc.get("body");
	                   out.collect(wc,new Text(tmp_tpc+"^topic^"+tmp_bdy));
                            }
                         }
                      keys.clear();is.close();
                      }
                   catch(Exception e){System.out.println(e);}
                   }
	       }
             catch(Exception io){io.printStackTrace();}
             }
          }
       }

    public static void main(String[] args)throws Exception
       {
       String IN_DIR="/opt/hadoop-0.15.0/indir";
       String OUT_DIR="/opt/hadoop-0.15.0/outdir";

       try
          {
          JobConf jc=new JobConf(ql.class);
          jc.setJobName("ql");
          jc.setMapperClass(M.class);
          jc.setReducerClass(R.class);
          jc.setOutputKeyClass(Text.class);
          jc.setOutputValueClass(BytesWritable.class);
          jc.setInputPath(new Path(IN_DIR));
          jc.setOutputPath(new Path(OUT_DIR));
          JobClient.runJob(jc);
	 }
       catch(Exception e){System.out.println(e);}
       }

    private static Document make_lucene_doc(String in_tpc,String  
in_bdy,String in_itm)
       {
       Document d=new Document();
       d.add(new Field 
("topic",in_tpc,Field.Store.YES,Field.Index.TOKENIZED));
       d.add(new Field 
("item",in_itm,Field.Store.NO,Field.Index.UN_TOKENIZED));
       d.add(new Field 
("body",in_bdy,Field.Store.YES,Field.Index.TOKENIZED));
       return d;
       }

    private static String key_maker(String t)
       {
       Random r=new Random();
       Date d=new Date(System.currentTimeMillis());
       return key_filter(Double.toString(r.nextDouble())+d.toString() 
+"_"+t);
       }

    private static String key_filter(String s)
       { // adapted
       String rs="";StringBuffer sb=new StringBuffer("");
       String  
ok="_abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
       for(int i=0;i<s.length();i++)if(ok.indexOf(s.charAt(i))> -1) 
sb.append(s.charAt(i));
       rs=(sb.toString()).trim();sb.delete(0,sb.length());return rs;
       }

    private static Object bo(byte[] b)throws Exception
       { // adapted
       ByteArrayInputStream bis=new ByteArrayInputStream(b);
       ObjectInputStream ois=new ObjectInputStream(bis);
       Object o=ois.readObject();ois.close();bis.close();return o;
       }

    private static byte[] ob(Object o)
       { // adapted
       byte[] b=new String("").getBytes();
       ByteArrayOutputStream bos=new ByteArrayOutputStream();
       try{ObjectOutputStream os=new ObjectOutputStream(bos);
       os.writeObject(o);b=bos.toByteArray();bos.close();os.close();}
       catch(IOException e){}return b;
       }
    }

Regards,

Peter W.