You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2007/09/28 18:12:25 UTC
svn commit: r580399 - in /lucene/hadoop/trunk/src/contrib/hbase/src:
java/org/apache/hadoop/hbase/mapred/ test/org/apache/hadoop/hbase/
test/org/apache/hadoop/hbase/mapred/
Author: stack
Date: Fri Sep 28 09:12:24 2007
New Revision: 580399
URL: http://svn.apache.org/viewvc?rev=580399&view=rev
Log:
HADOOP-1913 Build a Lucene index on an HBase table
Files I failed to add/delete on original commit
Added:
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
Removed:
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestTableMapReduce.java
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/BuildTableIndex.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,184 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+
+/**
+ * Example table column indexing class. Runs a mapreduce job to index
+ * specified table columns.
+ * <ul><li>Each row is modeled as a Lucene document: row key is indexed in
+ * its untokenized form, column name-value pairs are Lucene field name-value
+ * pairs.</li>
+ * <li>A file passed on command line is used to populate an
+ * {@link IndexConfiguration} which is used to set various Lucene parameters,
+ * specify whether to optimize an index and which columns to index and/or
+ * store, in tokenized or untokenized form, etc. For an example, see the
+ * <code>createIndexConfContent</code> method in TestTableIndex
+ * </li>
+ * <li>The number of reduce tasks decides the number of indexes (partitions).
+ * The index(es) is stored in the output path of job configuration.</li>
+ * <li>The index build process is done in the reduce phase. Users can use
+ * the map phase to join rows from different tables or to pre-parse/analyze
+ * column content, etc.</li>
+ * </ul>
+ */
+public class BuildTableIndex {
+ private static final String USAGE = "Usage: BuildTableIndex " +
+ "-m <numMapTasks> -r <numReduceTasks>\n -indexConf <iconfFile> " +
+ "-indexDir <indexDir>\n -table <tableName> -columns <columnName1> " +
+ "[<columnName2> ...]";
+
+ private static void printUsage(String message) {
+ System.err.println(message);
+ System.err.println(USAGE);
+ System.exit(-1);
+ }
+
+ public BuildTableIndex() {
+ super();
+ }
+
+ public void run(String[] args) throws IOException {
+ if (args.length < 6) {
+ printUsage("Too few arguments");
+ }
+
+ int numMapTasks = 1;
+ int numReduceTasks = 1;
+ String iconfFile = null;
+ String indexDir = null;
+ String tableName = null;
+ StringBuffer columnNames = null;
+
+ // parse args
+ for (int i = 0; i < args.length - 1; i++) {
+ if ("-m".equals(args[i])) {
+ numMapTasks = Integer.parseInt(args[++i]);
+ } else if ("-r".equals(args[i])) {
+ numReduceTasks = Integer.parseInt(args[++i]);
+ } else if ("-indexConf".equals(args[i])) {
+ iconfFile = args[++i];
+ } else if ("-indexDir".equals(args[i])) {
+ indexDir = args[++i];
+ } else if ("-table".equals(args[i])) {
+ tableName = args[++i];
+ } else if ("-columns".equals(args[i])) {
+ columnNames = new StringBuffer(args[++i]);
+ while (i + 1 < args.length && !args[i + 1].startsWith("-")) {
+ columnNames.append(" ");
+ columnNames.append(args[++i]);
+ }
+ } else {
+ printUsage("Unsupported option " + args[i]);
+ }
+ }
+
+ if (indexDir == null || tableName == null || columnNames == null) {
+ printUsage("Index directory, table name and at least one column must " +
+ "be specified");
+ }
+
+ Configuration conf = new HBaseConfiguration();
+ if (iconfFile != null) {
+ // set index configuration content from a file
+ String content = readContent(iconfFile);
+ IndexConfiguration iconf = new IndexConfiguration();
+ // purely to validate, exception will be thrown if not valid
+ iconf.addFromXML(content);
+ conf.set("hbase.index.conf", content);
+ }
+
+ JobConf jobConf = createJob(conf, numMapTasks, numReduceTasks, indexDir,
+ tableName, columnNames.toString());
+ JobClient.runJob(jobConf);
+ }
+
+ public JobConf createJob(Configuration conf, int numMapTasks,
+ int numReduceTasks, String indexDir, String tableName,
+ String columnNames) {
+ JobConf jobConf = new JobConf(conf, BuildTableIndex.class);
+ jobConf.setJobName("build index for table " + tableName);
+ jobConf.setNumMapTasks(numMapTasks);
+ // number of indexes to partition into
+ jobConf.setNumReduceTasks(numReduceTasks);
+
+ // use identity map (a waste, but just as an example)
+ IdentityTableMap.initJob(tableName, columnNames, IdentityTableMap.class,
+ jobConf);
+
+ // use IndexTableReduce to build a Lucene index
+ jobConf.setReducerClass(IndexTableReduce.class);
+ jobConf.setOutputPath(new Path(indexDir));
+ jobConf.setOutputFormat(IndexOutputFormat.class);
+
+ return jobConf;
+ }
+
+ /*
+ * Read xml file of indexing configurations. The xml format is similar to
+ * hbase-default.xml and hadoop-default.xml. For an example configuration,
+ * see the <code>createIndexConfContent</code> method in TestTableIndex
+ * @param fileName File to read.
+ * @return XML configuration read from file
+ * @throws IOException
+ */
+ private String readContent(String fileName) throws IOException {
+ File file = new File(fileName);
+ int length = (int) file.length();
+ if (length == 0) {
+ printUsage("Index configuration file " + fileName + " does not exist");
+ }
+
+ int bytesRead = 0;
+ byte[] bytes = new byte[length];
+ FileInputStream fis = new FileInputStream(file);
+
+ try {
+ // read entire file into content
+ while (bytesRead < length) {
+ int read = fis.read(bytes, bytesRead, length - bytesRead);
+ if (read > 0) {
+ bytesRead += read;
+ } else {
+ break;
+ }
+ }
+ } finally {
+ fis.close();
+ }
+
+ return new String(bytes, 0, bytesRead, HConstants.UTF8_ENCODING);
+ }
+
+ public static void main(String[] args) throws IOException {
+ BuildTableIndex build = new BuildTableIndex();
+ build.run(args);
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexConfiguration.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,418 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.StringWriter;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.transform.Transformer;
+import javax.xml.transform.TransformerFactory;
+import javax.xml.transform.dom.DOMSource;
+import javax.xml.transform.stream.StreamResult;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+
+/**
+ * Configuration parameters for building a Lucene index
+ */
+public class IndexConfiguration extends Configuration {
+ private static final Log LOG = LogFactory.getLog(IndexConfiguration.class);
+
+ static final String HBASE_COLUMN_NAME = "hbase.column.name";
+ static final String HBASE_COLUMN_STORE = "hbase.column.store";
+ static final String HBASE_COLUMN_INDEX = "hbase.column.index";
+ static final String HBASE_COLUMN_TOKENIZE = "hbase.column.tokenize";
+ static final String HBASE_COLUMN_BOOST = "hbase.column.boost";
+ static final String HBASE_COLUMN_OMIT_NORMS = "hbase.column.omit.norms";
+ static final String HBASE_INDEX_ROWKEY_NAME = "hbase.index.rowkey.name";
+ static final String HBASE_INDEX_ANALYZER_NAME = "hbase.index.analyzer.name";
+ static final String HBASE_INDEX_MAX_BUFFERED_DOCS =
+ "hbase.index.max.buffered.docs";
+ static final String HBASE_INDEX_MAX_BUFFERED_DELS =
+ "hbase.index.max.buffered.dels";
+ static final String HBASE_INDEX_MAX_FIELD_LENGTH =
+ "hbase.index.max.field.length";
+ static final String HBASE_INDEX_MAX_MERGE_DOCS =
+ "hbase.index.max.merge.docs";
+ static final String HBASE_INDEX_MERGE_FACTOR = "hbase.index.merge.factor";
+ // double ramBufferSizeMB;
+ static final String HBASE_INDEX_SIMILARITY_NAME =
+ "hbase.index.similarity.name";
+ static final String HBASE_INDEX_USE_COMPOUND_FILE =
+ "hbase.index.use.compound.file";
+ static final String HBASE_INDEX_OPTIMIZE = "hbase.index.optimize";
+
+ public static class ColumnConf extends Properties {
+ boolean getBoolean(String name, boolean defaultValue) {
+ String valueString = getProperty(name);
+ if ("true".equals(valueString))
+ return true;
+ else if ("false".equals(valueString))
+ return false;
+ else
+ return defaultValue;
+ }
+
+ void setBoolean(String name, boolean value) {
+ setProperty(name, Boolean.toString(value));
+ }
+
+ float getFloat(String name, float defaultValue) {
+ String valueString = getProperty(name);
+ if (valueString == null)
+ return defaultValue;
+ try {
+ return Float.parseFloat(valueString);
+ } catch (NumberFormatException e) {
+ return defaultValue;
+ }
+ }
+
+ void setFloat(String name, float value) {
+ setProperty(name, Float.toString(value));
+ }
+ }
+
+ private HashMap<String, ColumnConf> columnMap = new HashMap<String, ColumnConf>();
+
+ public Iterator<String> columnNameIterator() {
+ return columnMap.keySet().iterator();
+ }
+
+ public boolean isIndex(String columnName) {
+ return getColumn(columnName).getBoolean(HBASE_COLUMN_INDEX, true);
+ }
+
+ public void setIndex(String columnName, boolean index) {
+ getColumn(columnName).setBoolean(HBASE_COLUMN_INDEX, index);
+ }
+
+ public boolean isStore(String columnName) {
+ return getColumn(columnName).getBoolean(HBASE_COLUMN_STORE, false);
+ }
+
+ public void setStore(String columnName, boolean store) {
+ getColumn(columnName).setBoolean(HBASE_COLUMN_STORE, store);
+ }
+
+ public boolean isTokenize(String columnName) {
+ return getColumn(columnName).getBoolean(HBASE_COLUMN_TOKENIZE, true);
+ }
+
+ public void setTokenize(String columnName, boolean tokenize) {
+ getColumn(columnName).setBoolean(HBASE_COLUMN_TOKENIZE, tokenize);
+ }
+
+ public float getBoost(String columnName) {
+ return getColumn(columnName).getFloat(HBASE_COLUMN_BOOST, 1.0f);
+ }
+
+ public void setBoost(String columnName, float boost) {
+ getColumn(columnName).setFloat(HBASE_COLUMN_BOOST, boost);
+ }
+
+ public boolean isOmitNorms(String columnName) {
+ return getColumn(columnName).getBoolean(HBASE_COLUMN_OMIT_NORMS, true);
+ }
+
+ public void setOmitNorms(String columnName, boolean omitNorms) {
+ getColumn(columnName).setBoolean(HBASE_COLUMN_OMIT_NORMS, omitNorms);
+ }
+
+ private ColumnConf getColumn(String columnName) {
+ ColumnConf column = columnMap.get(columnName);
+ if (column == null) {
+ column = new ColumnConf();
+ columnMap.put(columnName, column);
+ }
+ return column;
+ }
+
+ public String getAnalyzerName() {
+ return get(HBASE_INDEX_ANALYZER_NAME,
+ "org.apache.lucene.analysis.standard.StandardAnalyzer");
+ }
+
+ public void setAnalyzerName(String analyzerName) {
+ set(HBASE_INDEX_ANALYZER_NAME, analyzerName);
+ }
+
+ public int getMaxBufferedDeleteTerms() {
+ return getInt(HBASE_INDEX_MAX_BUFFERED_DELS, 1000);
+ }
+
+ public void setMaxBufferedDeleteTerms(int maxBufferedDeleteTerms) {
+ setInt(HBASE_INDEX_MAX_BUFFERED_DELS, maxBufferedDeleteTerms);
+ }
+
+ public int getMaxBufferedDocs() {
+ return getInt(HBASE_INDEX_MAX_BUFFERED_DOCS, 10);
+ }
+
+ public void setMaxBufferedDocs(int maxBufferedDocs) {
+ setInt(HBASE_INDEX_MAX_BUFFERED_DOCS, maxBufferedDocs);
+ }
+
+ public int getMaxFieldLength() {
+ return getInt(HBASE_INDEX_MAX_FIELD_LENGTH, Integer.MAX_VALUE);
+ }
+
+ public void setMaxFieldLength(int maxFieldLength) {
+ setInt(HBASE_INDEX_MAX_FIELD_LENGTH, maxFieldLength);
+ }
+
+ public int getMaxMergeDocs() {
+ return getInt(HBASE_INDEX_MAX_MERGE_DOCS, Integer.MAX_VALUE);
+ }
+
+ public void setMaxMergeDocs(int maxMergeDocs) {
+ setInt(HBASE_INDEX_MAX_MERGE_DOCS, maxMergeDocs);
+ }
+
+ public int getMergeFactor() {
+ return getInt(HBASE_INDEX_MERGE_FACTOR, 10);
+ }
+
+ public void setMergeFactor(int mergeFactor) {
+ setInt(HBASE_INDEX_MERGE_FACTOR, mergeFactor);
+ }
+
+ public String getRowkeyName() {
+ return get(HBASE_INDEX_ROWKEY_NAME, "ROWKEY");
+ }
+
+ public void setRowkeyName(String rowkeyName) {
+ set(HBASE_INDEX_ROWKEY_NAME, rowkeyName);
+ }
+
+ public String getSimilarityName() {
+ return get(HBASE_INDEX_SIMILARITY_NAME, null);
+ }
+
+ public void setSimilarityName(String similarityName) {
+ set(HBASE_INDEX_SIMILARITY_NAME, similarityName);
+ }
+
+ public boolean isUseCompoundFile() {
+ return getBoolean(HBASE_INDEX_USE_COMPOUND_FILE, false);
+ }
+
+ public void setUseCompoundFile(boolean useCompoundFile) {
+ setBoolean(HBASE_INDEX_USE_COMPOUND_FILE, useCompoundFile);
+ }
+
+ public boolean doOptimize() {
+ return getBoolean(HBASE_INDEX_OPTIMIZE, true);
+ }
+
+ public void setDoOptimize(boolean doOptimize) {
+ setBoolean(HBASE_INDEX_OPTIMIZE, doOptimize);
+ }
+
+ public void addFromXML(String content) {
+ try {
+ DocumentBuilder builder = DocumentBuilderFactory.newInstance()
+ .newDocumentBuilder();
+
+ Document doc = builder
+ .parse(new ByteArrayInputStream(content.getBytes()));
+
+ Element root = doc.getDocumentElement();
+ if (!"configuration".equals(root.getTagName())) {
+ LOG.fatal("bad conf file: top-level element not <configuration>");
+ }
+
+ NodeList props = root.getChildNodes();
+ for (int i = 0; i < props.getLength(); i++) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element)) {
+ continue;
+ }
+
+ Element prop = (Element) propNode;
+ if ("property".equals(prop.getTagName())) {
+ propertyFromXML(prop, null);
+ } else if ("column".equals(prop.getTagName())) {
+ columnConfFromXML(prop);
+ } else {
+ LOG.warn("bad conf content: element neither <property> nor <column>");
+ }
+ }
+ } catch (Exception e) {
+ LOG.fatal("error parsing conf content: " + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void propertyFromXML(Element prop, Properties properties) {
+ NodeList fields = prop.getChildNodes();
+ String attr = null;
+ String value = null;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element)) {
+ continue;
+ }
+
+ Element field = (Element) fieldNode;
+ if ("name".equals(field.getTagName())) {
+ attr = ((Text) field.getFirstChild()).getData();
+ }
+ if ("value".equals(field.getTagName()) && field.hasChildNodes()) {
+ value = ((Text) field.getFirstChild()).getData();
+ }
+ }
+
+ if (attr != null && value != null) {
+ if (properties == null) {
+ set(attr, value);
+ } else {
+ properties.setProperty(attr, value);
+ }
+ }
+ }
+
+ private void columnConfFromXML(Element column) {
+ ColumnConf columnConf = new ColumnConf();
+ NodeList props = column.getChildNodes();
+ for (int i = 0; i < props.getLength(); i++) {
+ Node propNode = props.item(i);
+ if (!(propNode instanceof Element)) {
+ continue;
+ }
+
+ Element prop = (Element) propNode;
+ if ("property".equals(prop.getTagName())) {
+ propertyFromXML(prop, columnConf);
+ } else {
+ LOG.warn("bad conf content: element not <property>");
+ }
+ }
+
+ if (columnConf.getProperty(HBASE_COLUMN_NAME) != null) {
+ columnMap.put(columnConf.getProperty(HBASE_COLUMN_NAME), columnConf);
+ } else {
+ LOG.warn("bad column conf: name not specified");
+ }
+ }
+
+ public void write(OutputStream out) throws IOException {
+ try {
+ Document doc = writeDocument();
+ DOMSource source = new DOMSource(doc);
+ StreamResult result = new StreamResult(out);
+ TransformerFactory transFactory = TransformerFactory.newInstance();
+ Transformer transformer = transFactory.newTransformer();
+ transformer.transform(source, result);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Document writeDocument() {
+ Iterator<Map.Entry<String, String>> iter = iterator();
+ try {
+ Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+ .newDocument();
+ Element conf = doc.createElement("configuration");
+ doc.appendChild(conf);
+ conf.appendChild(doc.createTextNode("\n"));
+
+ Map.Entry<String, String> entry;
+ while (iter.hasNext()) {
+ entry = iter.next();
+ String name = entry.getKey();
+ String value = entry.getValue();
+ writeProperty(doc, conf, name, value);
+ }
+
+ Iterator<String> columnIter = columnNameIterator();
+ while (columnIter.hasNext()) {
+ writeColumn(doc, conf, columnIter.next());
+ }
+
+ return doc;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void writeProperty(Document doc, Element parent, String name,
+ String value) {
+ Element propNode = doc.createElement("property");
+ parent.appendChild(propNode);
+
+ Element nameNode = doc.createElement("name");
+ nameNode.appendChild(doc.createTextNode(name));
+ propNode.appendChild(nameNode);
+
+ Element valueNode = doc.createElement("value");
+ valueNode.appendChild(doc.createTextNode(value));
+ propNode.appendChild(valueNode);
+
+ parent.appendChild(doc.createTextNode("\n"));
+ }
+
+ private void writeColumn(Document doc, Element parent, String columnName) {
+ Element column = doc.createElement("column");
+ parent.appendChild(column);
+ column.appendChild(doc.createTextNode("\n"));
+
+ ColumnConf columnConf = getColumn(columnName);
+ for (Map.Entry<Object, Object> entry : columnConf.entrySet()) {
+ if (entry.getKey() instanceof String
+ && entry.getValue() instanceof String) {
+ writeProperty(doc, column, (String) entry.getKey(), (String) entry
+ .getValue());
+ }
+ }
+ }
+
+ public String toString() {
+ StringWriter writer = new StringWriter();
+ try {
+ Document doc = writeDocument();
+ DOMSource source = new DOMSource(doc);
+ StreamResult result = new StreamResult(writer);
+ TransformerFactory transFactory = TransformerFactory.newInstance();
+ Transformer transformer = transFactory.newTransformer();
+ transformer.transform(source, result);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return writer.toString();
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexOutputFormat.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,163 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormatBase;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.Progressable;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.search.Similarity;
+
+/**
+ * Create a local index, unwrap Lucene documents created by reduce, add them to
+ * the index, and copy the index to the destination.
+ */
+public class IndexOutputFormat extends
+ OutputFormatBase<Text, LuceneDocumentWrapper> {
+ static final Log LOG = LogFactory.getLog(IndexOutputFormat.class);
+
+ @Override
+ public RecordWriter<Text, LuceneDocumentWrapper> getRecordWriter(
+ final FileSystem fs, JobConf job, String name, final Progressable progress)
+ throws IOException {
+
+ final Path perm = new Path(job.getOutputPath(), name);
+ final Path temp = job.getLocalPath("index/_"
+ + Integer.toString(new Random().nextInt()));
+
+ LOG.info("To index into " + perm);
+
+ // delete old, if any
+ fs.delete(perm);
+
+ final IndexConfiguration indexConf = new IndexConfiguration();
+ String content = job.get("hbase.index.conf");
+ if (content != null) {
+ indexConf.addFromXML(content);
+ }
+
+ String analyzerName = indexConf.getAnalyzerName();
+ Analyzer analyzer;
+ try {
+ Class analyzerClass = Class.forName(analyzerName);
+ analyzer = (Analyzer) analyzerClass.newInstance();
+ } catch (Exception e) {
+ throw new IOException("Error in creating an analyzer object "
+ + analyzerName);
+ }
+
+ // build locally first
+ final IndexWriter writer = new IndexWriter(fs.startLocalOutput(perm, temp)
+ .toString(), analyzer, true);
+
+ // no delete, so no need for maxBufferedDeleteTerms
+ writer.setMaxBufferedDocs(indexConf.getMaxBufferedDocs());
+ writer.setMaxFieldLength(indexConf.getMaxFieldLength());
+ writer.setMaxMergeDocs(indexConf.getMaxMergeDocs());
+ writer.setMergeFactor(indexConf.getMergeFactor());
+ String similarityName = indexConf.getSimilarityName();
+ if (similarityName != null) {
+ try {
+ Class similarityClass = Class.forName(similarityName);
+ Similarity similarity = (Similarity) similarityClass.newInstance();
+ writer.setSimilarity(similarity);
+ } catch (Exception e) {
+ throw new IOException("Error in creating a similarty object "
+ + similarityName);
+ }
+ }
+ writer.setUseCompoundFile(indexConf.isUseCompoundFile());
+
+ return new RecordWriter<Text, LuceneDocumentWrapper>() {
+ private boolean closed;
+ private long docCount = 0;
+
+ public void write(@SuppressWarnings("unused") Text key,
+ LuceneDocumentWrapper value)
+ throws IOException {
+ // unwrap and index doc
+ Document doc = value.get();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(" Indexing [" + doc + "]");
+ }
+
+ writer.addDocument(doc);
+ docCount++;
+ progress.progress();
+ }
+
+ public void close(final Reporter reporter) throws IOException {
+ // spawn a thread to give progress heartbeats
+ Thread prog = new Thread() {
+ public void run() {
+ while (!closed) {
+ try {
+ reporter.setStatus("closing");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ continue;
+ } catch (Throwable e) {
+ return;
+ }
+ }
+ }
+ };
+
+ try {
+ prog.start();
+
+ // optimize index
+ if (indexConf.doOptimize()) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Optimizing index.");
+ }
+ writer.optimize();
+ }
+
+ // close index
+ writer.close();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Done indexing " + docCount + " docs.");
+ }
+
+ // copy to perm destination in dfs
+ fs.completeLocalOutput(perm, temp);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Copy done.");
+ }
+ } finally {
+ closed = true;
+ }
+ }
+ };
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IndexTableReduce.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,107 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.log4j.Logger;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field;
+
+/**
+ * Construct a Lucene document per row, which is consumed by IndexOutputFormat
+ * to build a Lucene index
+ */
+public class IndexTableReduce extends MapReduceBase implements
+ Reducer<Text, MapWritable, Text, LuceneDocumentWrapper> {
+ private static final Logger LOG = Logger.getLogger(IndexTableReduce.class);
+
+ private IndexConfiguration indexConf;
+
+ public void configure(JobConf job) {
+ super.configure(job);
+ indexConf = new IndexConfiguration();
+ String content = job.get("hbase.index.conf");
+ if (content != null) {
+ indexConf.addFromXML(content);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Index conf: " + indexConf);
+ }
+ }
+
+ public void close() throws IOException {
+ super.close();
+ }
+
+ public void reduce(Text key, Iterator<MapWritable> values,
+ OutputCollector<Text, LuceneDocumentWrapper> output, Reporter reporter)
+ throws IOException {
+ if (!values.hasNext()) {
+ return;
+ }
+
+ Document doc = new Document();
+
+ // index and store row key, row key already UTF-8 encoded
+ Field keyField = new Field(indexConf.getRowkeyName(), key.toString(),
+ Field.Store.YES, Field.Index.UN_TOKENIZED);
+ keyField.setOmitNorms(true);
+ doc.add(keyField);
+
+ while (values.hasNext()) {
+ MapWritable value = values.next();
+
+ // each column (name-value pair) is a field (name-value pair)
+ for (Map.Entry<Writable, Writable> entry : value.entrySet()) {
+ // name is already UTF-8 encoded
+ String column = ((Text) entry.getKey()).toString();
+ byte[] columnValue = ((ImmutableBytesWritable)entry.getValue()).get();
+ Field.Store store = indexConf.isStore(column)?
+ Field.Store.YES: Field.Store.NO;
+ Field.Index index = indexConf.isIndex(column)?
+ (indexConf.isTokenize(column)?
+ Field.Index.TOKENIZED: Field.Index.UN_TOKENIZED):
+ Field.Index.NO;
+
+ // UTF-8 encode value
+ Field field = new Field(column, new String(columnValue,
+ HConstants.UTF8_ENCODING), store, index);
+ field.setBoost(indexConf.getBoost(column));
+ field.setOmitNorms(indexConf.isOmitNorms(column));
+
+ doc.add(field);
+ }
+ }
+ output.collect(key, new LuceneDocumentWrapper(doc));
+ }
+}
Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/LuceneDocumentWrapper.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,51 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.lucene.document.Document;
+
+/**
+ * A utility class used to pass a lucene document from reduce to OutputFormat.
+ * It doesn't really serialize/deserialize a lucene document.
+ */
+class LuceneDocumentWrapper implements Writable {
+ private Document doc;
+
+ public LuceneDocumentWrapper(Document doc) {
+ this.doc = doc;
+ }
+
+ public Document get() {
+ return doc;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ // intentionally left blank
+ }
+
+ public void write(DataOutput out) throws IOException {
+ // intentionally left blank
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableIndex.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+
+import junit.framework.TestSuite;
+import junit.textui.TestRunner;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MultiRegionTable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.MultiSearcher;
+import org.apache.lucene.search.Searchable;
+import org.apache.lucene.search.Searcher;
+import org.apache.lucene.search.TermQuery;
+
+/**
+ * Test Map/Reduce job to build index over HBase table
+ */
+public class TestTableIndex extends HBaseTestCase {
+ private static final Log LOG = LogFactory.getLog(TestTableIndex.class);
+
+ static final String TABLE_NAME = "moretest";
+ static final String INPUT_COLUMN = "contents:";
+ static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+ static final String OUTPUT_COLUMN = "text:";
+ static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+ static final String ROWKEY_NAME = "key";
+ static final String INDEX_DIR = "testindex";
+
+ private HTableDescriptor desc;
+
+ private MiniDFSCluster dfsCluster = null;
+ private FileSystem fs;
+ private Path dir;
+ private MiniHBaseCluster hCluster = null;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ // This size should make it so we always split using the addContent
+ // below. After adding all data, the first region is 1.3M
+ conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
+
+ desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+
+ dfsCluster = new MiniDFSCluster(conf, 1, true, (String[]) null);
+ try {
+ fs = dfsCluster.getFileSystem();
+
+ dir = new Path("/hbase");
+ fs.mkdirs(dir);
+
+ // Start up HBase cluster
+ hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+
+ // Create a table.
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ admin.createTable(desc);
+
+ // Populate a table into multiple regions
+ MultiRegionTable.makeMultiRegionTable(conf, hCluster, null, TABLE_NAME,
+ INPUT_COLUMN);
+
+ // Verify table indeed has multiple regions
+ HTable table = new HTable(conf, new Text(TABLE_NAME));
+ Text[] startKeys = table.getStartKeys();
+ assertTrue(startKeys.length > 1);
+ } catch (Exception e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+
+ if (hCluster != null) {
+ hCluster.shutdown();
+ }
+
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+ }
+
+ /**
+ * Test HBase map/reduce
+ *
+ * @throws IOException
+ */
+ @SuppressWarnings("static-access")
+ public void testTableIndex() throws IOException {
+ long firstK = 32;
+ LOG.info("Print table contents before map/reduce");
+ scanTable(conf, firstK);
+
+ @SuppressWarnings("deprecation")
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+ // set configuration parameter for index build
+ conf.set("hbase.index.conf", createIndexConfContent());
+
+ try {
+ JobConf jobConf = new JobConf(conf, TestTableIndex.class);
+ jobConf.setJobName("index column contents");
+ jobConf.setNumMapTasks(2);
+ // number of indexes to partition into
+ jobConf.setNumReduceTasks(1);
+
+ // use identity map (a waste, but just as an example)
+ IdentityTableMap.initJob(TABLE_NAME, INPUT_COLUMN,
+ IdentityTableMap.class, jobConf);
+
+ // use IndexTableReduce to build a Lucene index
+ jobConf.setReducerClass(IndexTableReduce.class);
+ jobConf.setOutputPath(new Path(INDEX_DIR));
+ jobConf.setOutputFormat(IndexOutputFormat.class);
+
+ JobClient.runJob(jobConf);
+
+ } finally {
+ mrCluster.shutdown();
+ }
+
+ LOG.info("Print table contents after map/reduce");
+ scanTable(conf, firstK);
+
+ // verify index results
+ verify(conf);
+ }
+
+ private String createIndexConfContent() {
+ StringBuffer buffer = new StringBuffer();
+ buffer.append("<configuration><column><property>" +
+ "<name>hbase.column.name</name><value>" + INPUT_COLUMN +
+ "</value></property>");
+ buffer.append("<property><name>hbase.column.store</name> " +
+ "<value>true</value></property>");
+ buffer.append("<property><name>hbase.column.index</name>" +
+ "<value>true</value></property>");
+ buffer.append("<property><name>hbase.column.tokenize</name>" +
+ "<value>false</value></property>");
+ buffer.append("<property><name>hbase.column.boost</name>" +
+ "<value>3</value></property>");
+ buffer.append("<property><name>hbase.column.omit.norms</name>" +
+ "<value>false</value></property></column>");
+ buffer.append("<property><name>hbase.index.rowkey.name</name><value>" +
+ ROWKEY_NAME + "</value></property>");
+ buffer.append("<property><name>hbase.index.max.buffered.docs</name>" +
+ "<value>500</value></property>");
+ buffer.append("<property><name>hbase.index.max.field.length</name>" +
+ "<value>10000</value></property>");
+ buffer.append("<property><name>hbase.index.merge.factor</name>" +
+ "<value>10</value></property>");
+ buffer.append("<property><name>hbase.index.use.compound.file</name>" +
+ "<value>true</value></property>");
+ buffer.append("<property><name>hbase.index.optimize</name>" +
+ "<value>true</value></property></configuration>");
+
+ IndexConfiguration c = new IndexConfiguration();
+ c.addFromXML(buffer.toString());
+ return c.toString();
+ }
+
+ private void scanTable(Configuration c, long firstK) throws IOException {
+ HTable table = new HTable(c, new Text(TABLE_NAME));
+ Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
+ HScannerInterface scanner = table.obtainScanner(columns,
+ HConstants.EMPTY_START_ROW);
+ long count = 0;
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ while (scanner.next(key, results)) {
+ if (count < firstK)
+ LOG.info("row: " + key.getRow());
+ for (Map.Entry<Text, byte[]> e : results.entrySet()) {
+ if (count < firstK)
+ LOG.info(" column: " + e.getKey() + " value: "
+ + new String(e.getValue(), HConstants.UTF8_ENCODING));
+ }
+ count++;
+ }
+ } finally {
+ scanner.close();
+ }
+ }
+
+ private void verify(Configuration c) throws IOException {
+ Path localDir = new Path(this.testDir, "index_" +
+ Integer.toString(new Random().nextInt()));
+ this.fs.copyToLocalFile(new Path(INDEX_DIR), localDir);
+ Path [] indexDirs = this.localFs.listPaths(new Path [] {localDir});
+ Searcher searcher = null;
+ HScannerInterface scanner = null;
+ try {
+ if (indexDirs.length == 1) {
+ searcher = new IndexSearcher((new File(indexDirs[0].
+ toUri())).getAbsolutePath());
+ } else if (indexDirs.length > 1) {
+ Searchable[] searchers = new Searchable[indexDirs.length];
+ for (int i = 0; i < indexDirs.length; i++) {
+ searchers[i] = new IndexSearcher((new File(indexDirs[i].
+ toUri()).getAbsolutePath()));
+ }
+ searcher = new MultiSearcher(searchers);
+ } else {
+ throw new IOException("no index directory found");
+ }
+
+ HTable table = new HTable(c, new Text(TABLE_NAME));
+ Text[] columns = { TEXT_INPUT_COLUMN, TEXT_OUTPUT_COLUMN };
+ scanner = table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+ IndexConfiguration indexConf = new IndexConfiguration();
+ String content = c.get("hbase.index.conf");
+ if (content != null) {
+ indexConf.addFromXML(content);
+ }
+ String rowkeyName = indexConf.getRowkeyName();
+
+ int count = 0;
+ while (scanner.next(key, results)) {
+ String value = key.getRow().toString();
+ Term term = new Term(rowkeyName, value);
+ int hitCount = searcher.search(new TermQuery(term)).length();
+ assertEquals("check row " + value, 1, hitCount);
+ count++;
+ }
+ int maxDoc = searcher.maxDoc();
+ assertEquals("check number of rows", count, maxDoc);
+ } finally {
+ if (null != searcher)
+ searcher.close();
+ if (null != scanner)
+ scanner.close();
+ }
+ }
+ /**
+ * @param args unused
+ */
+ public static void main(@SuppressWarnings("unused") String[] args) {
+ TestRunner.run(new TestSuite(TestTableIndex.class));
+ }
+}
\ No newline at end of file
Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=580399&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Fri Sep 28 09:12:24 2007
@@ -0,0 +1,387 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapred;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.hbase.HBaseAdmin;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HScannerInterface;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTable;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MultiRegionTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableMap;
+import org.apache.hadoop.hbase.mapred.TableOutputCollector;
+import org.apache.hadoop.hbase.mapred.TableReduce;
+import org.apache.hadoop.hbase.mapred.IdentityTableReduce;
+
+/**
+ * Test Map/Reduce job over HBase tables
+ */
+public class TestTableMapReduce extends MultiRegionTable {
+ @SuppressWarnings("hiding")
+ private static final Log LOG =
+ LogFactory.getLog(TestTableMapReduce.class.getName());
+
+ static final String SINGLE_REGION_TABLE_NAME = "srtest";
+ static final String MULTI_REGION_TABLE_NAME = "mrtest";
+ static final String INPUT_COLUMN = "contents:";
+ static final Text TEXT_INPUT_COLUMN = new Text(INPUT_COLUMN);
+ static final String OUTPUT_COLUMN = "text:";
+ static final Text TEXT_OUTPUT_COLUMN = new Text(OUTPUT_COLUMN);
+
+ private MiniDFSCluster dfsCluster = null;
+ private FileSystem fs;
+ private Path dir;
+ private MiniHBaseCluster hCluster = null;
+
+ private static byte[][] values = null;
+
+ static {
+ try {
+ values = new byte[][] {
+ "0123".getBytes(HConstants.UTF8_ENCODING),
+ "abcd".getBytes(HConstants.UTF8_ENCODING),
+ "wxyz".getBytes(HConstants.UTF8_ENCODING),
+ "6789".getBytes(HConstants.UTF8_ENCODING)
+ };
+ } catch (UnsupportedEncodingException e) {
+ fail();
+ }
+ }
+
+ /** constructor */
+ public TestTableMapReduce() {
+ super();
+
+ // Make lease timeout longer, lease checks less frequent
+ conf.setInt("hbase.master.lease.period", 10 * 1000);
+ conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ // This size is picked so the table is split into two
+ // after addContent in testMultiRegionTableMapReduce.
+ conf.setLong("hbase.hregion.max.filesize", 256 * 1024);
+ dfsCluster = new MiniDFSCluster(conf, 1, true, (String[])null);
+ try {
+ fs = dfsCluster.getFileSystem();
+ dir = new Path("/hbase");
+ fs.mkdirs(dir);
+ // Start up HBase cluster
+ hCluster = new MiniHBaseCluster(conf, 1, dfsCluster);
+ LOG.info("Master is at " + this.conf.get(HConstants.MASTER_ADDRESS));
+ } catch (Exception e) {
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ dfsCluster = null;
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ if(hCluster != null) {
+ hCluster.shutdown();
+ }
+
+ if (dfsCluster != null) {
+ dfsCluster.shutdown();
+ }
+
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ LOG.info("During tear down got a " + e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Pass the given key and processed record reduce
+ */
+ public static class ProcessContentsMapper extends TableMap {
+
+ /** constructor */
+ public ProcessContentsMapper() {
+ super();
+ }
+
+ /**
+ * Pass the key, and reversed value to reduce
+ *
+ * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter)
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void map(HStoreKey key, MapWritable value,
+ TableOutputCollector output,
+ @SuppressWarnings("unused") Reporter reporter) throws IOException {
+
+ Text tKey = key.getRow();
+
+ if(value.size() != 1) {
+ throw new IOException("There should only be one input column");
+ }
+
+ Text[] keys = value.keySet().toArray(new Text[value.size()]);
+ if(!keys[0].equals(TEXT_INPUT_COLUMN)) {
+ throw new IOException("Wrong input column. Expected: " + INPUT_COLUMN
+ + " but got: " + keys[0]);
+ }
+
+ // Get the original value and reverse it
+
+ String originalValue =
+ new String(((ImmutableBytesWritable)value.get(keys[0])).get(),
+ HConstants.UTF8_ENCODING);
+ StringBuilder newValue = new StringBuilder();
+ for(int i = originalValue.length() - 1; i >= 0; i--) {
+ newValue.append(originalValue.charAt(i));
+ }
+
+ // Now set the value to be collected
+
+ MapWritable outval = new MapWritable();
+ outval.put(TEXT_OUTPUT_COLUMN, new ImmutableBytesWritable(
+ newValue.toString().getBytes(HConstants.UTF8_ENCODING)));
+
+ output.collect(tKey, outval);
+ }
+ }
+
+ /**
+ * Test hbase mapreduce jobs against single region and multi-region tables.
+ * @throws IOException
+ */
+ public void testTableMapReduce() throws IOException {
+ localTestSingleRegionTable();
+ localTestMultiRegionTable();
+ }
+
+ /*
+ * Test against a single region.
+ * @throws IOException
+ */
+ private void localTestSingleRegionTable() throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(SINGLE_REGION_TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+
+ // Create a table.
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
+ admin.createTable(desc);
+
+ // insert some data into the test table
+ HTable table = new HTable(conf, new Text(SINGLE_REGION_TABLE_NAME));
+
+ for(int i = 0; i < values.length; i++) {
+ long lockid = table.startUpdate(new Text("row_"
+ + String.format("%1$05d", i)));
+
+ try {
+ table.put(lockid, TEXT_INPUT_COLUMN, values[i]);
+ table.commit(lockid, System.currentTimeMillis());
+ lockid = -1;
+ } finally {
+ if (lockid != -1)
+ table.abort(lockid);
+ }
+ }
+
+ LOG.info("Print table contents before map/reduce");
+ scanTable(conf, SINGLE_REGION_TABLE_NAME);
+
+ @SuppressWarnings("deprecation")
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+ try {
+ JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+ jobConf.setJobName("process column contents");
+ jobConf.setNumMapTasks(1);
+ jobConf.setNumReduceTasks(1);
+
+ TableMap.initJob(SINGLE_REGION_TABLE_NAME, INPUT_COLUMN,
+ ProcessContentsMapper.class, jobConf);
+
+ TableReduce.initJob(SINGLE_REGION_TABLE_NAME,
+ IdentityTableReduce.class, jobConf);
+
+ JobClient.runJob(jobConf);
+
+ } finally {
+ mrCluster.shutdown();
+ }
+
+ LOG.info("Print table contents after map/reduce");
+ scanTable(conf, SINGLE_REGION_TABLE_NAME);
+
+ // verify map-reduce results
+ verify(conf, SINGLE_REGION_TABLE_NAME);
+ }
+
+ /*
+ * Test against multiple regions.
+ * @throws IOException
+ */
+ private void localTestMultiRegionTable() throws IOException {
+ HTableDescriptor desc = new HTableDescriptor(MULTI_REGION_TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(INPUT_COLUMN));
+ desc.addFamily(new HColumnDescriptor(OUTPUT_COLUMN));
+
+ // Create a table.
+ HBaseAdmin admin = new HBaseAdmin(this.conf);
+ admin.createTable(desc);
+
+ // Populate a table into multiple regions
+ MultiRegionTable.makeMultiRegionTable(conf, hCluster, fs,
+ MULTI_REGION_TABLE_NAME, INPUT_COLUMN);
+
+ // Verify table indeed has multiple regions
+ HTable table = new HTable(conf, new Text(MULTI_REGION_TABLE_NAME));
+ Text[] startKeys = table.getStartKeys();
+ assertTrue(startKeys.length > 1);
+
+ @SuppressWarnings("deprecation")
+ MiniMRCluster mrCluster = new MiniMRCluster(2, fs.getUri().toString(), 1);
+
+ try {
+ JobConf jobConf = new JobConf(conf, TestTableMapReduce.class);
+ jobConf.setJobName("process column contents");
+ jobConf.setNumMapTasks(2);
+ jobConf.setNumReduceTasks(1);
+
+ TableMap.initJob(MULTI_REGION_TABLE_NAME, INPUT_COLUMN,
+ ProcessContentsMapper.class, jobConf);
+
+ TableReduce.initJob(MULTI_REGION_TABLE_NAME,
+ IdentityTableReduce.class, jobConf);
+
+ JobClient.runJob(jobConf);
+
+ } finally {
+ mrCluster.shutdown();
+ }
+
+ // verify map-reduce results
+ verify(conf, MULTI_REGION_TABLE_NAME);
+ }
+
+ private void scanTable(Configuration conf, String tableName)
+ throws IOException {
+ HTable table = new HTable(conf, new Text(tableName));
+
+ Text[] columns = {
+ TEXT_INPUT_COLUMN,
+ TEXT_OUTPUT_COLUMN
+ };
+ HScannerInterface scanner =
+ table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+ while(scanner.next(key, results)) {
+ LOG.info("row: " + key.getRow());
+
+ for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+ LOG.info(" column: " + e.getKey() + " value: "
+ + new String(e.getValue(), HConstants.UTF8_ENCODING));
+ }
+ }
+
+ } finally {
+ scanner.close();
+ }
+ }
+
+ @SuppressWarnings("null")
+ private void verify(Configuration conf, String tableName) throws IOException {
+ HTable table = new HTable(conf, new Text(tableName));
+
+ Text[] columns = {
+ TEXT_INPUT_COLUMN,
+ TEXT_OUTPUT_COLUMN
+ };
+ HScannerInterface scanner =
+ table.obtainScanner(columns, HConstants.EMPTY_START_ROW);
+
+ try {
+ HStoreKey key = new HStoreKey();
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+
+ while(scanner.next(key, results)) {
+ byte[] firstValue = null;
+ byte[] secondValue = null;
+ int count = 0;
+
+ for(Map.Entry<Text, byte[]> e: results.entrySet()) {
+ if (count == 0)
+ firstValue = e.getValue();
+ if (count == 1)
+ secondValue = e.getValue();
+ count++;
+ }
+
+ // verify second value is the reverse of the first
+ assertNotNull(firstValue);
+ assertNotNull(secondValue);
+ assertEquals(firstValue.length, secondValue.length);
+ for (int i=0; i<firstValue.length; i++) {
+ assertEquals(firstValue[i], secondValue[firstValue.length-i-1]);
+ }
+ }
+
+ } finally {
+ scanner.close();
+ }
+ }
+}