You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2008/11/17 19:34:08 UTC
svn commit: r718317 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/
src/java/org/apache/hadoop/hbase/client/tableindexed/
src/java/org/apache/hadoop/hbase/ipc/ src/java/org/apache/hadoop/hbase/reg...
Author: apurtell
Date: Mon Nov 17 10:34:07 2008
New Revision: 718317
URL: http://svn.apache.org/viewvc?rev=718317&view=rev
Log:
HBASE-883 Secondary indexes
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/WritableComparator.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon Nov 17 10:34:07 2008
@@ -127,6 +127,7 @@
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
(Andrzej Bialecki via Stack)
HBASE-625 Metrics support for cluster load history: emissions and graphs
+ HBASE-883 Secondary indexes (Clint Morgan via Andrew Purtell)
OPTIMIZATIONS
HBASE-748 Add an efficient way to batch update many rows
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HStoreKey.java Mon Nov 17 10:34:07 2008
@@ -529,6 +529,9 @@
if(rowCompare == 0)
rowCompare = Bytes.compareTo(keysA[1], KeysB[1]);
return rowCompare;
+ }
+ if (regionInfo != null && regionInfo.getTableDesc().getRowKeyComparator() != null) {
+ return regionInfo.getTableDesc().getRowKeyComparator().compare(rowA, rowB);
}
return Bytes.compareTo(rowA, rowB);
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Mon Nov 17 10:34:07 2008
@@ -19,8 +19,12 @@
*/
package org.apache.hadoop.hbase;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.DataInput;
+import java.io.DataInputStream;
import java.io.DataOutput;
+import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -29,8 +33,10 @@
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.WritableComparable;
/**
@@ -41,7 +47,8 @@
// Changes prior to version 3 were not recorded here.
// Version 3 adds metadata as a map where keys and values are byte[].
- public static final byte TABLE_DESCRIPTOR_VERSION = 3;
+ // Version 4 adds indexes
+ public static final byte TABLE_DESCRIPTOR_VERSION = 4;
private byte [] name = HConstants.EMPTY_BYTE_ARRAY;
private String nameAsString = "";
@@ -66,9 +73,13 @@
public static final ImmutableBytesWritable IS_ROOT_KEY =
new ImmutableBytesWritable(Bytes.toBytes(IS_ROOT));
public static final String IS_META = "IS_META";
+
+ public static final String ROW_KEY_COMPARATOR = "ROW_KEY_COMPARATOR";
+
public static final ImmutableBytesWritable IS_META_KEY =
new ImmutableBytesWritable(Bytes.toBytes(IS_META));
+
// The below are ugly but better than creating them each time till we
// replace booleans being saved as Strings with plain booleans. Need a
// migration script to do this. TODO.
@@ -90,6 +101,10 @@
private final Map<Integer, HColumnDescriptor> families =
new HashMap<Integer, HColumnDescriptor>();
+ // Key is indexId
+ private final Map<String, IndexSpecification> indexes =
+ new HashMap<String, IndexSpecification>();
+
/**
* Private constructor used internally creating table descriptors for
* catalog tables: e.g. .META. and -ROOT-.
@@ -108,6 +123,7 @@
* catalog tables: e.g. .META. and -ROOT-.
*/
protected HTableDescriptor(final byte [] name, HColumnDescriptor[] families,
+ Collection<IndexSpecification> indexes,
Map<ImmutableBytesWritable,ImmutableBytesWritable> values) {
this.name = name.clone();
this.nameAsString = Bytes.toString(this.name);
@@ -115,6 +131,9 @@
for(HColumnDescriptor descriptor : families) {
this.families.put(Bytes.mapKey(descriptor.getName()), descriptor);
}
+ for(IndexSpecification index : indexes) {
+ this.indexes.put(index.getIndexId(), index);
+ }
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry:
values.entrySet()) {
this.values.put(entry.getKey(), entry.getValue());
@@ -404,7 +423,7 @@
return Integer.valueOf(Bytes.toString(value)).intValue();
return DEFAULT_MEMCACHE_FLUSH_SIZE;
}
-
+
/**
* @param memcacheFlushSize memory cache flush size for each hregion
*/
@@ -412,6 +431,58 @@
setValue(MEMCACHE_FLUSHSIZE_KEY,
Bytes.toBytes(Integer.toString(memcacheFlushSize)));
}
+
+
+ public void setRowKeyComparator(WritableComparator<byte[]> newComparator) {
+ if (newComparator == null) {
+ return;
+ }
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ HBaseConfiguration conf = new HBaseConfiguration();
+ try {
+ ObjectWritable.writeObject(dos, newComparator, WritableComparator.class, conf);
+ dos.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ setValue(ROW_KEY_COMPARATOR.getBytes(), bos.toByteArray());
+ this.comparator = newComparator;
+ }
+
+ private WritableComparator<byte[]> comparator = null;
+ public WritableComparator<byte[]> getRowKeyComparator() {
+ if (comparator != null) {
+ return comparator;
+ }
+ byte[] bytes = getValue(ROW_KEY_COMPARATOR.getBytes());
+ if (bytes == null) {
+ return null;
+ }
+ ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+ DataInputStream in = new DataInputStream(bis);
+ HBaseConfiguration conf = new HBaseConfiguration();
+ try {
+ comparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in, conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return comparator;
+ }
+
+
+ public Collection<IndexSpecification> getIndexes() {
+ return indexes.values();
+ }
+
+ public IndexSpecification getIndex(String indexId) {
+ return indexes.get(indexId);
+ }
+
+ public void addIndex(IndexSpecification index) {
+ indexes.put(index.getIndexId(), index);
+ }
/**
* Adds a column family.
@@ -519,6 +590,16 @@
c.readFields(in);
families.put(Bytes.mapKey(c.getName()), c);
}
+ indexes.clear();
+ if (version < 4) {
+ return;
+ }
+ int numIndexes = in.readInt();
+ for (int i = 0; i < numIndexes; i++) {
+ IndexSpecification index = new IndexSpecification();
+ index.readFields(in);
+ addIndex(index);
+ }
}
public void write(DataOutput out) throws IOException {
@@ -538,6 +619,10 @@
HColumnDescriptor family = it.next();
family.write(out);
}
+ out.writeInt(indexes.size());
+ for(IndexSpecification index : indexes.values()) {
+ index.write(out);
+ }
}
// Comparable
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/WritableComparator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/WritableComparator.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/WritableComparator.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/WritableComparator.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,28 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.util.Comparator;
+
+import org.apache.hadoop.io.Writable;
+
+public interface WritableComparator<T> extends Writable, Comparator<T> {
+// No methods, just bring the two interfaces together
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java Mon Nov 17 10:34:07 2008
@@ -22,6 +22,7 @@
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
/**
* Read-only table descriptor.
@@ -37,7 +38,7 @@
* @param desc
*/
UnmodifyableHTableDescriptor(final HTableDescriptor desc) {
- super(desc.getName(), getUnmodifyableFamilies(desc), desc.getValues());
+ super(desc.getName(), getUnmodifyableFamilies(desc), desc.getIndexes(), desc.getValues());
}
/*
@@ -108,4 +109,9 @@
public void setMemcacheFlushSize(int memcacheFlushSize) {
throw new UnsupportedOperationException("HTableDescriptor is read-only");
}
+
+ @Override
+ public void addIndex(IndexSpecification index) {
+ throw new UnsupportedOperationException("HTableDescriptor is read-only");
+ }
}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,29 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+public interface IndexKeyGenerator extends Writable {
+
+ byte [] createIndexKey(byte [] rowKey, Map<byte [], byte []> columns);
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,45 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+
+/**
+ * Thrown when asking for an index that does not exist.
+ */
+public class IndexNotFoundException extends IOException {
+
+ public IndexNotFoundException() {
+ super();
+ }
+
+ public IndexNotFoundException(String arg0) {
+ super(arg0);
+ }
+
+ public IndexNotFoundException(Throwable arg0) {
+ super(arg0.getMessage());
+ }
+
+ public IndexNotFoundException(String arg0, Throwable arg1) {
+ super(arg0+arg1.getMessage());
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,200 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.WritableComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/** Holds the specification for a single secondary index. */
+public class IndexSpecification implements Writable {
+
+ // Columns that are indexed (part of the indexRowKey)
+ private byte[][] indexedColumns;
+
+ // Constructs the
+ private IndexKeyGenerator keyGenerator;
+
+ private WritableComparator<byte[]> keyComparator;
+
+ // Additional columns mapped into the indexed row. These will be available for
+ // filters when scanning the index.
+ private byte[][] additionalColumns;
+
+ private byte[][] allColumns;
+
+ // Id of this index, unique within a table.
+ private String indexId;
+
+ /** Construct an "simple" index spec for a single column. */
+ public IndexSpecification(String indexId, byte[] indexedColumn,
+ boolean acending) {
+ this(indexId, new byte[][] { indexedColumn }, null,
+ new SimpleIndexKeyGenerator(indexedColumn), acending == true ? null
+ : new ReverseByteArrayComparator());
+ }
+
+ /**
+ * Construct an index spec by specifying everything.
+ *
+ * @param indexId
+ * @param indexedColumns
+ * @param additionalColumns
+ * @param keyGenerator
+ * @param keyComparator
+ */
+ public IndexSpecification(String indexId, byte[][] indexedColumns,
+ byte[][] additionalColumns, IndexKeyGenerator keyGenerator,
+ WritableComparator<byte[]> keyComparator) {
+ this.indexId = indexId;
+ this.indexedColumns = indexedColumns;
+ this.additionalColumns = additionalColumns;
+ this.keyGenerator = keyGenerator;
+ this.keyComparator = keyComparator;
+ this.makeAllColumns();
+ }
+
+ public IndexSpecification() {
+ // For writable
+ }
+
+ private void makeAllColumns() {
+ this.allColumns = new byte[indexedColumns.length
+ + (additionalColumns == null ? 0 : additionalColumns.length)][];
+ System.arraycopy(indexedColumns, 0, allColumns, 0, indexedColumns.length);
+ if (additionalColumns != null) {
+ System.arraycopy(additionalColumns, 0, allColumns, indexedColumns.length,
+ additionalColumns.length);
+ }
+ }
+
+ /**
+ * Get the indexedColumns.
+ *
+ * @return Return the indexedColumns.
+ */
+ public byte[][] getIndexedColumns() {
+ return indexedColumns;
+ }
+
+ /**
+ * Get the keyGenerator.
+ *
+ * @return Return the keyGenerator.
+ */
+ public IndexKeyGenerator getKeyGenerator() {
+ return keyGenerator;
+ }
+
+ /**
+ * Get the keyComparator.
+ *
+ * @return Return the keyComparator.
+ */
+ public WritableComparator<byte[]> getKeyComparator() {
+ return keyComparator;
+ }
+
+ /**
+ * Get the additionalColumns.
+ *
+ * @return Return the additionalColumns.
+ */
+ public byte[][] getAdditionalColumns() {
+ return additionalColumns;
+ }
+
+ /**
+ * Get the indexId.
+ *
+ * @return Return the indexId.
+ */
+ public String getIndexId() {
+ return indexId;
+ }
+
+ public byte[][] getAllColumns() {
+ return allColumns;
+ }
+
+ public boolean containsColumn(byte[] column) {
+ for (byte[] col : allColumns) {
+ if (Bytes.equals(column, col)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public byte[] getIndexedTableName(byte[] baseTableName) {
+ return Bytes.add(baseTableName, Bytes.toBytes("-" + indexId));
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ public void readFields(DataInput in) throws IOException {
+ indexId = in.readUTF();
+ int numIndexedCols = in.readInt();
+ indexedColumns = new byte[numIndexedCols][];
+ for (int i = 0; i < numIndexedCols; i++) {
+ indexedColumns[i] = Bytes.readByteArray(in);
+ }
+ int numAdditionalCols = in.readInt();
+ additionalColumns = new byte[numAdditionalCols][];
+ for (int i = 0; i < numAdditionalCols; i++) {
+ additionalColumns[i] = Bytes.readByteArray(in);
+ }
+ makeAllColumns();
+ HBaseConfiguration conf = new HBaseConfiguration();
+ keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, conf);
+ keyComparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in,
+ conf);
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(indexId);
+ out.writeInt(indexedColumns.length);
+ for (byte[] col : indexedColumns) {
+ Bytes.writeByteArray(out, col);
+ }
+ if (additionalColumns != null) {
+ out.writeInt(additionalColumns.length);
+ for (byte[] col : additionalColumns) {
+ Bytes.writeByteArray(out, col);
+ }
+ } else {
+ out.writeInt(0);
+ }
+ HBaseConfiguration conf = new HBaseConfiguration();
+ ObjectWritable
+ .writeObject(out, keyGenerator, IndexKeyGenerator.class, conf);
+ ObjectWritable.writeObject(out, keyComparator, WritableComparable.class,
+ conf);
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** HTable extended with indexed support. */
+public class IndexedTable extends TransactionalTable {
+
+ // FIXME, these belong elsewhere
+ public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__");
+ public static final byte[] INDEX_COL_FAMILY = Bytes.add(
+ INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER });
+ public static final byte[] INDEX_BASE_ROW_COLUMN = Bytes.add(
+ INDEX_COL_FAMILY, Bytes.toBytes("ROW"));
+
+ private static final Log LOG = LogFactory.getLog(IndexedTable.class);
+
+ private Map<String, HTable> indexIdToTable = new HashMap<String, HTable>();
+
+ /** {@inheritDoc} */
+ public IndexedTable(final HBaseConfiguration conf, final byte[] tableName)
+ throws IOException {
+ super(conf, tableName);
+
+ for (IndexSpecification spec : super.getTableDescriptor().getIndexes()) {
+ indexIdToTable.put(spec.getIndexId(), new HTable(conf, spec
+ .getIndexedTableName(tableName)));
+ }
+ }
+
+ /**
+ * Open up an indexed scanner. Results will come back in the indexed order,
+ * but will contain RowResults from the original table.
+ *
+ * @param indexId the id of the index to use
+ * @param indexStartRow (created from the IndexKeyGenerator)
+ * @param indexColumns in the index table
+ * @param indexFilter filter to run on the index'ed table. This can only use
+ * columns that have been added to the index.
+ * @param baseColumns from the original table
+ * @return scanner
+ * @throws IOException
+ * @throws IndexNotFoundException
+ */
+ public Scanner getIndexedScanner(String indexId, final byte[] indexStartRow,
+ byte[][] indexColumns, final RowFilterInterface indexFilter,
+ final byte[][] baseColumns) throws IOException, IndexNotFoundException {
+ IndexSpecification indexSpec = super.getTableDescriptor().getIndex(indexId);
+ if (indexSpec == null) {
+ throw new IndexNotFoundException("Index " + indexId
+ + " not defined in table "
+ + super.getTableDescriptor().getNameAsString());
+ }
+ verifyIndexColumns(indexColumns, indexSpec);
+ // TODO, verify/remove index columns from baseColumns
+
+ HTable indexTable = indexIdToTable.get(indexId);
+
+ byte[][] allIndexColumns;
+ if (indexColumns != null) {
+ allIndexColumns = new byte[indexColumns.length + 1][];
+ System
+ .arraycopy(indexColumns, 0, allIndexColumns, 0, indexColumns.length);
+ allIndexColumns[indexColumns.length] = INDEX_BASE_ROW_COLUMN;
+ } else {
+ byte[][] allColumns = indexSpec.getAllColumns();
+ allIndexColumns = new byte[allColumns.length + 1][];
+ System.arraycopy(allColumns, 0, allIndexColumns, 0, allColumns.length);
+ allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN;
+ }
+
+ Scanner indexScanner = indexTable.getScanner(allIndexColumns,
+ indexStartRow, indexFilter);
+
+ return new ScannerWrapper(indexScanner, baseColumns);
+ }
+
+ private void verifyIndexColumns(byte[][] requestedColumns,
+ IndexSpecification indexSpec) {
+ if (requestedColumns == null) {
+ return;
+ }
+ for (byte[] requestedColumn : requestedColumns) {
+ boolean found = false;
+ for (byte[] indexColumn : indexSpec.getAllColumns()) {
+ if (Bytes.equals(requestedColumn, indexColumn)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new RuntimeException("Column [" + Bytes.toString(requestedColumn)
+ + "] not in index " + indexSpec.getIndexId());
+ }
+ }
+ }
+
+ private class ScannerWrapper implements Scanner {
+
+ private Scanner indexScanner;
+ private byte[][] columns;
+
+ public ScannerWrapper(Scanner indexScanner, byte[][] columns) {
+ this.indexScanner = indexScanner;
+ this.columns = columns;
+ }
+
+ /** {@inheritDoc} */
+ public RowResult next() throws IOException {
+ RowResult[] result = next(1);
+ if (result == null || result.length < 1)
+ return null;
+ return result[0];
+ }
+
+ /** {@inheritDoc} */
+ public RowResult[] next(int nbRows) throws IOException {
+ RowResult[] indexResult = indexScanner.next(nbRows);
+ if (indexResult == null) {
+ return null;
+ }
+ RowResult[] result = new RowResult[indexResult.length];
+ for (int i = 0; i < indexResult.length; i++) {
+ RowResult row = indexResult[i];
+ byte[] baseRow = row.get(INDEX_BASE_ROW_COLUMN).getValue();
+ LOG.debug("next index row [" + Bytes.toString(row.getRow())
+ + "] -> base row [" + Bytes.toString(baseRow) + "]");
+ HbaseMapWritable<byte[], Cell> colValues =
+ new HbaseMapWritable<byte[], Cell>();
+ if (columns != null && columns.length > 0) {
+ LOG.debug("Going to base table for remaining columns");
+ RowResult baseResult = IndexedTable.this.getRow(baseRow, columns);
+ colValues.putAll(baseResult);
+ }
+ for (Entry<byte[], Cell> entry : row.entrySet()) {
+ byte[] col = entry.getKey();
+ if (HStoreKey.matchingFamily(INDEX_COL_FAMILY_NAME, col)) {
+ continue;
+ }
+ colValues.put(col, entry.getValue());
+ }
+ result[i] = new RowResult(baseRow, colValues);
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ public void close() {
+ indexScanner.close();
+ }
+
+ /** {@inheritDoc} */
+ public Iterator<RowResult> iterator() {
+ // FIXME, copied from HTable.ClientScanner. Extract this to common base
+ // class?
+ return new Iterator<RowResult>() {
+ RowResult next = null;
+
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = ScannerWrapper.this.next();
+ return next != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ public RowResult next() {
+ if (!hasNext()) {
+ return null;
+ }
+ RowResult temp = next;
+ next = null;
+ return temp;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,99 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.hadoop.hbase.ColumnNameParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Extension of HBaseAdmin that creates indexed tables.
+ *
+ */
+public class IndexedTableAdmin extends HBaseAdmin {
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration object
+ * @throws MasterNotRunningException
+ */
+ public IndexedTableAdmin(HBaseConfiguration conf)
+ throws MasterNotRunningException {
+ super(conf);
+ }
+
+ /**
+ * Creates a new table
+ *
+ * @param desc table descriptor for table
+ *
+ * @throws IllegalArgumentException if the table name is reserved
+ * @throws MasterNotRunningException if master is not running
+ * @throws TableExistsException if table already exists (If concurrent
+ * threads, the table may have been created between test-for-existence and
+ * attempt-at-creation).
+ * @throws IOException
+ */
+ @Override
+ public void createTable(HTableDescriptor desc) throws IOException {
+ super.createTable(desc);
+ this.createIndexTables(desc);
+ }
+
+ private void createIndexTables(HTableDescriptor tableDesc) throws IOException {
+ byte[] baseTableName = tableDesc.getName();
+ for (IndexSpecification indexSpec : tableDesc.getIndexes()) {
+ HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName,
+ indexSpec);
+ super.createTable(indexTableDesc);
+ }
+ }
+
+ private HTableDescriptor createIndexTableDesc(byte[] baseTableName,
+ IndexSpecification indexSpec) throws ColumnNameParseException {
+ HTableDescriptor indexTableDesc = new HTableDescriptor(indexSpec
+ .getIndexedTableName(baseTableName));
+ Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ families.add(IndexedTable.INDEX_COL_FAMILY);
+ for (byte[] column : indexSpec.getAllColumns()) {
+ families.add(Bytes.add(HStoreKey.getFamily(column),
+ new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }));
+ }
+
+ for (byte[] colFamily : families) {
+ indexTableDesc.addFamily(new HColumnDescriptor(colFamily));
+ }
+
+ indexTableDesc.setRowKeyComparator(indexSpec.getKeyComparator());
+
+ return indexTableDesc;
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,46 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.WritableComparator;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class ReverseByteArrayComparator implements WritableComparator<byte[]> {
+
+ /** {@inheritDoc} */
+ public int compare(byte[] o1, byte[] o2) {
+ return Bytes.compareTo(o2, o1);
+ }
+
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput arg0) throws IOException {
+ // Nothing
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput arg0) throws IOException {
+ // Nothing
+ }
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** Creates indexed keys for a single column....
+ *
+ */
+public class SimpleIndexKeyGenerator implements IndexKeyGenerator {
+
+ private byte [] column;
+
+ public SimpleIndexKeyGenerator(byte [] column) {
+ this.column = column;
+ }
+
+ public SimpleIndexKeyGenerator() {
+ // For Writable
+ }
+
+ /** {@inheritDoc} */
+ public byte[] createIndexKey(byte[] rowKey, Map<byte[], byte[]> columns) {
+ return Bytes.add(columns.get(column), rowKey);
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ column = Bytes.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, column);
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html Mon Nov 17 10:34:07 2008
@@ -0,0 +1,47 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<head />
+<body bgcolor="white">
+
+This package provides support for secondary indexing by maintaining a separate, "index", table for each index.
+
+The IndexSpecification class provides the metadata for the index. This includes:
+<li> the columns that contribute to the index key,
+<li> additional columns to put in the index table (and are thus made available to filters on the index table),
+<li> an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row,
+<br> and
+<li> (optionally) a custom key comparator for the indexed table. This can allow an index on a deserialized column value.
+
+IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed.
+Afterwards, updates and deletes to the original table will trigger the updates in the index, and
+the indexes can be scanned using the API on IndexedTable.
+
+For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed.
+
+<p> To enable the indexing, modify hbase-site.xml to turn on the
+IndexedRegionServer. This is done by setting
+<i>hbase.regionserver.class</i> to
+<i>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</i> and
+<i>hbase.regionserver.impl </i> to
+<i>org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer</i>
+
+</body>
+</html>
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,11 @@
+/*
+ * $Id$
+ * Created on Sep 10, 2008
+ *
+ */
+package org.apache.hadoop.hbase.ipc;
+
+/** Interface for the indexed region server. */
+public interface IndexedRegionInterface extends TransactionalRegionInterface {
+ // No methods for now...
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,342 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.tableindexed;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
+import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.regionserver.FlushRequester;
+import org.apache.hadoop.hbase.regionserver.HLog;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
+import org.apache.hadoop.hbase.util.Bytes;
+
+class IndexedRegion extends TransactionalRegion {
+
+ private static final Log LOG = LogFactory.getLog(IndexedRegion.class);
+
+ private final HBaseConfiguration conf;
+ private Map<IndexSpecification, HTable> indexSpecToTable = new HashMap<IndexSpecification, HTable>();
+
+ public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs,
+ final HBaseConfiguration conf, final HRegionInfo regionInfo,
+ final FlushRequester flushListener) {
+ super(basedir, log, fs, conf, regionInfo, flushListener);
+ this.conf = conf;
+ }
+
+ private synchronized HTable getIndexTable(IndexSpecification index)
+ throws IOException {
+ HTable indexTable = indexSpecToTable.get(index);
+ if (indexTable == null) {
+ indexTable = new HTable(conf, index.getIndexedTableName(super
+ .getRegionInfo().getTableDesc().getName()));
+ indexSpecToTable.put(index, indexTable);
+ }
+ return indexTable;
+ }
+
+ private Collection<IndexSpecification> getIndexes() {
+ return super.getRegionInfo().getTableDesc().getIndexes();
+ }
+
+ /**
+ * @param batchUpdate
+ * @param lockid
+ * @param writeToWAL if true, then we write this update to the log
+ * @throws IOException
+ */
+ @Override
+ public void batchUpdate(BatchUpdate batchUpdate, Integer lockid, boolean writeToWAL)
+ throws IOException {
+ updateIndexes(batchUpdate); // Do this first because will want to see the old row
+ super.batchUpdate(batchUpdate, lockid, writeToWAL);
+ }
+
+ private void updateIndexes(BatchUpdate batchUpdate) throws IOException {
+ List<IndexSpecification> indexesToUpdate = new LinkedList<IndexSpecification>();
+
+ // Find the indexes we need to update
+ for (IndexSpecification index : getIndexes()) {
+ if (possiblyAppliesToIndex(index, batchUpdate)) {
+ indexesToUpdate.add(index);
+ }
+ }
+
+ if (indexesToUpdate.size() == 0) {
+ return;
+ }
+
+ Set<byte[]> neededColumns = getColumnsForIndexes(indexesToUpdate);
+
+ SortedMap<byte[], byte[]> newColumnValues = getColumnsFromBatchUpdate(batchUpdate);
+ Map<byte[], Cell> oldColumnCells = super.getFull(batchUpdate.getRow(),
+ neededColumns, HConstants.LATEST_TIMESTAMP, null);
+
+ // Handle delete batch updates. Go back and get the next older values
+ for (BatchOperation op : batchUpdate) {
+ if (!op.isPut()) {
+ Cell current = oldColumnCells.get(op.getColumn());
+ if (current != null) {
+ Cell [] older = super.get(batchUpdate.getRow(), op.getColumn(), current.getTimestamp(), 1);
+ if (older != null && older.length > 0) {
+ newColumnValues.put(op.getColumn(), older[0].getValue());
+ }
+ }
+ }
+ }
+
+ // Add the old values to the new if they are not there
+ for (Entry<byte[], Cell> oldEntry : oldColumnCells.entrySet()) {
+ if (!newColumnValues.containsKey(oldEntry.getKey())) {
+ newColumnValues.put(oldEntry.getKey(), oldEntry.getValue().getValue());
+ }
+ }
+
+
+
+ Iterator<IndexSpecification> indexIterator = indexesToUpdate.iterator();
+ while (indexIterator.hasNext()) {
+ IndexSpecification indexSpec = indexIterator.next();
+ if (!doesApplyToIndex(indexSpec, newColumnValues)) {
+ indexIterator.remove();
+ }
+ }
+
+ SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
+
+ for (IndexSpecification indexSpec : indexesToUpdate) {
+ removeOldIndexEntry(indexSpec, batchUpdate.getRow(), oldColumnValues);
+ updateIndex(indexSpec, batchUpdate.getRow(), newColumnValues);
+ }
+ }
+
+ /** Return the columns needed for the update. */
+ private Set<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
+ Set<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
+ for (IndexSpecification indexSpec : indexes) {
+ for (byte[] col : indexSpec.getAllColumns()) {
+ neededColumns.add(col);
+ }
+ }
+ return neededColumns;
+ }
+
+ private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
+ SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
+ for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
+ if (!oldColumnValues.containsKey(indexedCol)) {
+ LOG.debug("Index [" + indexSpec.getIndexId()
+ + "] not trying to remove old entry for row ["
+ + Bytes.toString(row) + "] because col ["
+ + Bytes.toString(indexedCol) + "] is missing");
+ return;
+ }
+ }
+
+ byte[] oldIndexRow = indexSpec.getKeyGenerator().createIndexKey(row,
+ oldColumnValues);
+ LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
+ + Bytes.toString(oldIndexRow) + "]");
+ getIndexTable(indexSpec).deleteAll(oldIndexRow);
+ }
+
+ private SortedMap<byte[], byte[]> getColumnsFromBatchUpdate(BatchUpdate b) {
+ SortedMap<byte[], byte[]> columnValues = new TreeMap<byte[], byte[]>(
+ Bytes.BYTES_COMPARATOR);
+ for (BatchOperation op : b) {
+ if (op.isPut()) {
+ columnValues.put(op.getColumn(), op.getValue());
+ }
+ }
+ return columnValues;
+ }
+
+ /** Ask if this update *could* apply to the index. It may actually apply if some of the columns needed are missing.
+ *
+ * @param indexSpec
+ * @param b
+ * @return true if possibly apply.
+ */
+ private boolean possiblyAppliesToIndex(IndexSpecification indexSpec, BatchUpdate b) {
+ for (BatchOperation op : b) {
+ if (indexSpec.containsColumn(op.getColumn())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /** Ask if this update does apply to the index.
+ *
+ * @param indexSpec
+ * @param b
+ * @return true if possibly apply.
+ */
+ private boolean doesApplyToIndex(IndexSpecification indexSpec, SortedMap<byte[], byte[]> columnValues) {
+
+ for (byte [] neededCol : indexSpec.getIndexedColumns()) {
+ if (! columnValues.containsKey(neededCol)) {
+ LOG.debug("Index [" + indexSpec.getIndexId() + "] can't be updated because ["
+ + Bytes.toString(neededCol) + "] is missing");
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void updateIndex(IndexSpecification indexSpec, byte[] row,
+ SortedMap<byte[], byte[]> columnValues) throws IOException {
+ BatchUpdate indexUpdate = createIndexUpdate(indexSpec, row, columnValues);
+ getIndexTable(indexSpec).commit(indexUpdate);
+ LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
+ + Bytes.toString(indexUpdate.getRow()) + "] for row ["
+ + Bytes.toString(row) + "]");
+
+ }
+
+ private BatchUpdate createIndexUpdate(IndexSpecification indexSpec,
+ byte[] row, SortedMap<byte[], byte[]> columnValues) {
+ byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row,
+ columnValues);
+ BatchUpdate update = new BatchUpdate(indexRow);
+
+ update.put(IndexedTable.INDEX_BASE_ROW_COLUMN, row);
+
+ for (byte[] col : indexSpec.getIndexedColumns()) {
+ byte[] val = columnValues.get(col);
+ if (val == null) {
+ throw new RuntimeException("Unexpected missing column value. ["+Bytes.toString(col)+"]");
+ }
+ update.put(col, val);
+ }
+
+ for (byte [] col : indexSpec.getAdditionalColumns()) {
+ byte[] val = columnValues.get(col);
+ if (val != null) {
+ update.put(col, val);
+ }
+ }
+
+ return update;
+ }
+
+ @Override
+ public void deleteAll(final byte[] row, final long ts, final Integer lockid)
+ throws IOException {
+
+ if (getIndexes().size() != 0) {
+
+ // Need all columns
+ Set<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
+
+ Map<byte[], Cell> oldColumnCells = super.getFull(row,
+ neededColumns, HConstants.LATEST_TIMESTAMP, null);
+ SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
+
+
+ for (IndexSpecification indexSpec : getIndexes()) {
+ removeOldIndexEntry(indexSpec, row, oldColumnValues);
+ }
+
+ // Handle if there is still a version visible.
+ if (ts != HConstants.LATEST_TIMESTAMP) {
+ Map<byte[], Cell> currentColumnCells = super.getFull(row,
+ neededColumns, ts, null);
+ SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
+
+ for (IndexSpecification indexSpec : getIndexes()) {
+ if (doesApplyToIndex(indexSpec, currentColumnValues)) {
+ updateIndex(indexSpec, row, currentColumnValues);
+ }
+ }
+ }
+ }
+ super.deleteAll(row, ts, lockid);
+ }
+
+ private SortedMap<byte[], byte[]> convertToValueMap(
+ Map<byte[], Cell> cellMap) {
+ SortedMap<byte[], byte[]> currentColumnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
+ for(Entry<byte[], Cell> entry : cellMap.entrySet()) {
+ currentColumnValues.put(entry.getKey(), entry.getValue().getValue());
+ }
+ return currentColumnValues;
+ }
+
+ @Override
+ public void deleteAll(final byte[] row, byte[] column, final long ts,
+ final Integer lockid) throws IOException {
+ List<IndexSpecification> indexesToUpdate = new LinkedList<IndexSpecification>();
+
+ for(IndexSpecification indexSpec : getIndexes()) {
+ if (indexSpec.containsColumn(column)) {
+ indexesToUpdate.add(indexSpec);
+ }
+ }
+
+ Set<byte[]> neededColumns = getColumnsForIndexes(indexesToUpdate);
+ Map<byte[], Cell> oldColumnCells = super.getFull(row,
+ neededColumns, HConstants.LATEST_TIMESTAMP, null);
+ SortedMap<byte [], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
+
+ for (IndexSpecification indexSpec : indexesToUpdate) {
+ removeOldIndexEntry(indexSpec, row, oldColumnValues);
+ }
+
+ // Handle if there is still a version visible.
+ if (ts != HConstants.LATEST_TIMESTAMP) {
+ Map<byte[], Cell> currentColumnCells = super.getFull(row,
+ neededColumns, ts, null);
+ SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
+
+ for (IndexSpecification indexSpec : getIndexes()) {
+ if (doesApplyToIndex(indexSpec, currentColumnValues)) {
+ updateIndex(indexSpec, row, currentColumnValues);
+ }
+ }
+ }
+
+ super.deleteAll(row, column, ts, lockid);
+ }
+
+}
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.tableindexed;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.ipc.IndexedRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
+import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * RegionServer which maintains secondary indexes.
+ *
+ **/
+public class IndexedRegionServer extends TransactionalRegionServer implements
+ IndexedRegionInterface {
+
+ public IndexedRegionServer(HBaseConfiguration conf) throws IOException {
+ this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
+ DEFAULT_REGIONSERVER_ADDRESS)), conf);
+ }
+
+ public IndexedRegionServer(HServerAddress serverAddress,
+ HBaseConfiguration conf) throws IOException {
+ super(serverAddress, conf);
+ }
+
+ @Override
+ protected HRegion instantiateRegion(final HRegionInfo regionInfo)
+ throws IOException {
+ HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
+ .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
+ .getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
+ r.initialize(null, new Progressable() {
+ public void progress() {
+ addProcessingMessage(regionInfo);
+ }
+ });
+ return r;
+ }
+
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java Mon Nov 17 10:34:07 2008
@@ -73,7 +73,7 @@
* will have to consult the transaction log to determine the final decision of
* the transaction. This is not yet implemented.
*/
-class TransactionalRegion extends HRegion {
+public class TransactionalRegion extends HRegion {
private static final String LEASE_TIME = "hbase.transaction.leaseTime";
private static final int DEFAULT_LEASE_TIME = 60 * 1000;
@@ -501,7 +501,7 @@
}
for (BatchUpdate update : state.getWriteSet()) {
- super.batchUpdate(update, false); // Don't need to WAL these
+ this.batchUpdate(update, false); // Don't need to WAL these
// FIME, maybe should be walled so we don't need to look so far back.
}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java?rev=718317&r1=718316&r2=718317&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java Mon Nov 17 10:34:07 2008
@@ -527,7 +527,7 @@
* @return Returns zero-prefixed 10-byte wide decimal version of passed
* number (Does absolute in case number is negative).
*/
- static byte [] format(final int number) {
+ public static byte [] format(final int number) {
byte [] b = new byte[10];
int d = Math.abs(number);
for (int i = b.length - 1; i >= 0; i--) {
Added: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java?rev=718317&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java (added)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java Mon Nov 17 10:34:07 2008
@@ -0,0 +1,131 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+import java.util.Random;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseClusterTestCase;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.PerformanceEvaluation;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestIndexedTable extends HBaseClusterTestCase {
+
+ private static final Log LOG = LogFactory.getLog(TestIndexedTable.class);
+
+ private static final String TABLE_NAME = "table1";
+
+ private static final byte[] FAMILY = Bytes.toBytes("family:");
+ private static final byte[] COL_A = Bytes.toBytes("family:a");
+ private static final String INDEX_COL_A_ASC = "A-Acending";
+
+ private static final int NUM_ROWS = 10;
+ private static final int MAX_VAL = 10000;
+
+ private IndexedTableAdmin admin;
+ private IndexedTable table;
+ private Random random = new Random();
+
+ /** constructor */
+ public TestIndexedTable() {
+ conf
+ .set(HConstants.REGION_SERVER_IMPL, IndexedRegionServer.class.getName());
+ conf.setInt("hbase.master.info.port", -1);
+ conf.setInt("hbase.regionserver.info.port", -1);
+ }
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+
+ // Create a new index that does lexicographic ordering on COL_A
+ IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A_ASC,
+ COL_A, true);
+ desc.addIndex(colAIndex);
+
+ admin = new IndexedTableAdmin(conf);
+ admin.createTable(desc);
+ table = new IndexedTable(conf, desc.getName());
+ }
+
+ private void writeInitalRows() throws IOException {
+ for (int i = 0; i < NUM_ROWS; i++) {
+ BatchUpdate update = new BatchUpdate(PerformanceEvaluation.format(i));
+ byte[] colA = PerformanceEvaluation.format(random.nextInt(MAX_VAL));
+ update.put(COL_A, colA);
+ table.commit(update);
+ LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: ["
+ + Bytes.toString(colA) + "]");
+ }
+ }
+
+
+ public void testInitialWrites() throws IOException {
+ writeInitalRows();
+ assertRowsInOrder(NUM_ROWS);
+ }
+
+ private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException {
+ Scanner scanner = table.getIndexedScanner(INDEX_COL_A_ASC,
+ HConstants.EMPTY_START_ROW, null, null, null);
+ int numRows = 0;
+ byte[] lastColA = null;
+ for (RowResult rowResult : scanner) {
+ byte[] colA = rowResult.get(COL_A).getValue();
+ LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow())
+ + "] value [" + Bytes.toString(colA) + "]");
+ if (lastColA != null) {
+ Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0);
+ }
+ lastColA = colA;
+ numRows++;
+ }
+ Assert.assertEquals(numRowsExpected, numRows);
+ }
+
+ public void testMultipleWrites() throws IOException {
+ writeInitalRows();
+ writeInitalRows(); // Update the rows.
+ assertRowsInOrder(NUM_ROWS);
+ }
+
+ public void testDelete() throws IOException {
+ writeInitalRows();
+ // Delete the first row;
+ table.deleteAll(PerformanceEvaluation.format(0));
+
+ assertRowsInOrder(NUM_ROWS - 1);
+ }
+
+}