You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by se...@apache.org on 2010/02/21 23:59:20 UTC
svn commit: r912434 [1/2] - in
/directory/sandbox/seelmann/hbase-partition/src:
main/java/org/apache/directory/server/core/partition/hbase/
main/java/org/apache/directory/server/core/partition/hbase/cursor/
main/java/org/apache/directory/server/core/pa...
Author: seelmann
Date: Sun Feb 21 22:59:19 2010
New Revision: 912434
URL: http://svn.apache.org/viewvc?rev=912434&view=rev
Log:
o added Map/Reduce job for LDIF import and indexing
o added pool for HTable objects
o removed counters for indices (they don't scale)
Added:
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTablePool.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/GetPerformanceEvaluation.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/LdifImportAndIndexIT.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteLdifImport.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteRunner.java
directory/sandbox/seelmann/hbase-partition/src/test/resources/testdata-5.ldif
Removed:
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexMapper.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java
Modified:
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java
directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseClusterTestCaseAdapter.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseDistributedRunner.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseEmbeddedRunner.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/AbstractHBaseTableTest.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableTest.java
directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTableTest.java
directory/sandbox/seelmann/hbase-partition/src/test/resources/log4j.properties
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java Sun Feb 21 22:59:19 2010
@@ -22,6 +22,7 @@
import java.io.File;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -39,7 +40,7 @@
import org.apache.directory.server.core.partition.hbase.index.HBaseSubAliasIndex;
import org.apache.directory.server.core.partition.hbase.index.HBaseSubLevelIndex;
import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
-import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
import org.apache.directory.server.xdbm.Index;
import org.apache.directory.server.xdbm.IndexCursor;
@@ -67,6 +68,8 @@
public class HBaseStore implements Store<ServerEntry>
{
+ public static final String STRONG_CONSISTENCY_PROPERTY = "org.apache.directory.strong.consistency";
+
private String tablePrefix;
private LdapDN suffixDn;
@@ -80,8 +83,8 @@
private HBaseMasterTable masterTable;
- private Map<String, HBaseUserIndex<? extends HBaseIndexTableBase>> userIndices = new HashMap<String, HBaseUserIndex<? extends HBaseIndexTableBase>>();
- private Index<String, ServerEntry> presenceIndex;
+ private Map<String, HBaseUserIndex<HBaseIndexTable>> userIndices = new HashMap<String, HBaseUserIndex<HBaseIndexTable>>();
+ private HBasePresenceIndex presenceIndex;
private Index<String, ServerEntry> ndnIndex;
private Index<Long, ServerEntry> oneLevelIndex;
private Index<Long, ServerEntry> subLevelIndex;
@@ -184,7 +187,7 @@
String oid = getAttributeTypeOid( attr );
if ( userIndices.containsKey( oid ) )
{
- HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+ HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
for ( Value<?> value : attribute )
{
index.add( value.getBytes(), id );
@@ -224,7 +227,7 @@
@SuppressWarnings("unchecked")
public void addIndex( Index<?, ServerEntry> index )
{
- this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex<? extends HBaseIndexTableBase> ) index );
+ this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex<HBaseIndexTable> ) index );
}
@@ -246,7 +249,7 @@
String oid = getAttributeTypeOid( attr );
if ( userIndices.containsKey( oid ) )
{
- HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+ HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
for ( Value<?> value : attribute )
{
index.drop( value.getBytes(), id );
@@ -350,7 +353,7 @@
}
- public Index<String, ServerEntry> getPresenceIndex()
+ public HBasePresenceIndex getPresenceIndex()
{
return presenceIndex;
}
@@ -404,7 +407,7 @@
}
- public Index<?, ServerEntry> getUserIndex( String id ) throws IndexNotFoundException
+ public HBaseUserIndex<HBaseIndexTable> getUserIndex( String id ) throws IndexNotFoundException
{
id = getAttributeTypeOid( id );
@@ -432,7 +435,7 @@
public Set<Index<?, ServerEntry>> getUserIndices()
{
- throw new UnsupportedOperationException();
+ return new HashSet<Index<?, ServerEntry>>( userIndices.values() );
}
@@ -485,7 +488,7 @@
String oid = getAttributeTypeOid( attr );
if ( userIndices.containsKey( oid ) )
{
- HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+ HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
for ( Value<?> value : attribute )
{
index.drop( value.getBytes(), id );
@@ -554,7 +557,7 @@
String oid = getAttributeTypeOid( attr );
if ( userIndices.containsKey( oid ) )
{
- HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+ HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
for ( Value<?> value : attribute )
{
index.add( value.getBytes(), id );
@@ -645,7 +648,7 @@
public void setPresenceIndex( Index<String, ServerEntry> presenceIndex ) throws Exception
{
- this.presenceIndex = presenceIndex;
+ this.presenceIndex = ( HBasePresenceIndex ) presenceIndex;
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java Sun Feb 21 22:59:19 2010
@@ -168,30 +168,30 @@
{
/*
* this case is relevant for substring filters with an initial pattern, e.g. (cn=test*)
- * - row filter is "^#test.*$"
+ * - row filter is "^=test.*$"
*/
start = indexTable.getScanKey( node.getInitial(), null );
stop = indexTable.getScanKey( node.getInitial(), HBaseIndexTable.VALUE_SCAN_STOP );
- rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" );
+ rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" );
}
else
{
/*
* this case is relevant for substring filters w/o an initial pattern, e.g. (cn=*test*)
* unfortunately we need to scan the whole index, but can set a row filter
- * - row filter is "^#.*test.*$"
+ * - row filter is "^=.*test.*$"
*/
start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null );
stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
- rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" );
+ rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" );
}
}
else if ( value != null )
{
/*
* this case is relevant for greater than filters (the start value is set by before(IndexEntry)), e.g. (cn>=test)
- * - start row is "#test"
- * - stop row is "#0xFF"
+ * - start row is "=test"
+ * - stop row is "=0xFF"
*/
start = indexTable.getScanKey( value, null );
stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
@@ -203,8 +203,8 @@
/*
* this case is relevant for less than filters, e.g. (cn<=test)
* unfortunately we need to scan the whole index
- * - start row is "#0x00"
- * - stop row is "#0xFF"
+ * - start row is "=0x00"
+ * - stop row is "=0xFF"
*/
start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null );
stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
@@ -234,7 +234,7 @@
if ( iterator.hasNext() )
{
Result result = iterator.next();
- value = indexTable.getValueFromCountKey( result.getRow() );
+ value = indexTable.extractValueFromEqualsKey( result.getRow() );
candidates = indexTable.getColumnCandidates( result ).iterator();
}
else
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java Sun Feb 21 22:59:19 2010
@@ -27,7 +27,6 @@
import org.apache.directory.server.core.partition.hbase.HBaseStore;
import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor;
import org.apache.directory.server.core.partition.hbase.table.HBasePresenceIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
/**
@@ -80,7 +79,7 @@
@Override
- public IndexCursor<String, ServerEntry> forwardCursor( String key ) throws Exception
+ public HBasePresenceIndexCursor forwardCursor( String key ) throws Exception
{
return new HBasePresenceIndexCursor( getPresenceIndexTable( key ) );
}
@@ -108,8 +107,8 @@
}
else
{
- HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store
- .getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), getCacheSize() );
+ HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store,
+ getCacheSize() );
presenceIndexTables.put( attributeTypeOid, presenceIndexTable );
return presenceIndexTable;
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java Sun Feb 21 22:59:19 2010
@@ -20,10 +20,8 @@
package org.apache.directory.server.core.partition.hbase.index;
-import org.apache.directory.server.core.entry.ServerEntry;
import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserColumnIndexCursor;
import org.apache.directory.server.core.partition.hbase.table.HBaseColumnIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
import org.apache.directory.shared.ldap.filter.SubstringNode;
@@ -58,27 +56,27 @@
@Override
- public IndexCursor<Object, ServerEntry> forwardCursor( Object value ) throws Exception
+ public HBaseUserColumnIndexCursor forwardCursor( Object value ) throws Exception
{
return new HBaseUserColumnIndexCursor( getAttributeId(), value, getIndexTable(), store );
}
@Override
- public IndexCursor<Object, ServerEntry> forwardCursor() throws Exception
+ public HBaseUserColumnIndexCursor forwardCursor() throws Exception
{
return new HBaseUserColumnIndexCursor( getAttributeId(), getIndexTable(), store );
}
- public IndexCursor<Object, ServerEntry> forwardSubstringCursor( SubstringNode node ) throws Exception
+ public HBaseUserColumnIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception
{
return new HBaseUserColumnIndexCursor( getAttributeId(), node, getIndexTable(), store );
}
@Override
- protected HBaseColumnIndexTable getIndexTable() throws Exception
+ public HBaseColumnIndexTable getIndexTable() throws Exception
{
if ( indexTable == null )
{
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java Sun Feb 21 22:59:19 2010
@@ -24,7 +24,7 @@
import org.apache.directory.server.core.entry.ServerEntry;
import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserIndexReverseCursor;
-import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
import org.apache.directory.server.core.partition.hbase.xdbmext.IndexSubstringExtension;
import org.apache.directory.server.xdbm.IndexCursor;
import org.apache.directory.shared.ldap.filter.SubstringNode;
@@ -37,7 +37,7 @@
* @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
* @version $Rev$, $Date$
*/
-public abstract class HBaseUserIndex<T extends HBaseIndexTableBase> extends AbstractHBaseIndex<Object, ServerEntry>
+public abstract class HBaseUserIndex<T extends HBaseIndexTable> extends AbstractHBaseIndex<Object, ServerEntry>
implements IndexSubstringExtension<Object, ServerEntry>
{
@@ -217,6 +217,6 @@
}
- protected abstract T getIndexTable() throws Exception;
+ public abstract T getIndexTable() throws Exception;
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java Sun Feb 21 22:59:19 2010
@@ -20,10 +20,8 @@
package org.apache.directory.server.core.partition.hbase.index;
-import org.apache.directory.server.core.entry.ServerEntry;
import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserRowIndexCursor;
import org.apache.directory.server.core.partition.hbase.table.HBaseRowIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
import org.apache.directory.shared.ldap.filter.SubstringNode;
@@ -58,32 +56,31 @@
@Override
- public IndexCursor<Object, ServerEntry> forwardCursor( Object value ) throws Exception
+ public HBaseUserRowIndexCursor forwardCursor( Object value ) throws Exception
{
return new HBaseUserRowIndexCursor( getAttributeId(), value, getIndexTable(), store );
}
@Override
- public IndexCursor<Object, ServerEntry> forwardCursor() throws Exception
+ public HBaseUserRowIndexCursor forwardCursor() throws Exception
{
return new HBaseUserRowIndexCursor( getAttributeId(), getIndexTable(), store );
}
- public IndexCursor<Object, ServerEntry> forwardSubstringCursor( SubstringNode node ) throws Exception
+ public HBaseUserRowIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception
{
return new HBaseUserRowIndexCursor( getAttributeId(), node, getIndexTable(), store );
}
@Override
- protected HBaseRowIndexTable getIndexTable() throws Exception
+ public HBaseRowIndexTable getIndexTable() throws Exception
{
if ( indexTable == null )
{
- indexTable = new HBaseRowIndexTable( getAttributeId(), store.getSchemaManager(), store.getTablePrefix(),
- store.getConfiguration(), getCacheSize() );
+ indexTable = new HBaseRowIndexTable( getAttributeId(), store, getCacheSize() );
}
return indexTable;
}
Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserColumnIndex;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserRowIndex;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
+import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.Value;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader;
+import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Mapper that build indices.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class IndexBuilder extends TableMapper<IntWritable, Text>
+{
+
+ private static enum Counters
+ {
+ INDEXED_ENTRIES
+ }
+
+ public static final String COLUMN_INDICES = "org.apache.directory.column.indices";
+ public static final String ROW_INDICES = "org.apache.directory.row.indices";
+ public static final String SUFFIX = "org.apache.directory.suffix";
+ public static final String TABLE_PREFIX = "org.apache.directory.table.prefix";
+
+ private HBaseStore store;
+
+
+ protected void setup( Mapper<ImmutableBytesWritable, Result, IntWritable, Text>.Context context )
+ throws IOException, InterruptedException
+ {
+ try
+ {
+ JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader();
+ SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader );
+ schemaManager.loadAllEnabled();
+
+ LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) );
+ suffixDn.normalize( schemaManager.getNormalizerMapping() );
+
+ store = new HBaseStore();
+
+ String columnIndices = context.getConfiguration().get( COLUMN_INDICES );
+ String[] columnIndicesSplitted = columnIndices.split( "," );
+ for ( String columnIndex : columnIndicesSplitted )
+ {
+ HBaseUserColumnIndex index = new HBaseUserColumnIndex();
+ String oid = schemaManager.getAttributeTypeRegistry().getOidByName( columnIndex );
+ index.setAttributeId( oid );
+ index.setStore( store );
+ store.addIndex( index );
+
+ }
+ String rowIndices = context.getConfiguration().get( ROW_INDICES );
+ String[] rowIndicesSplitted = rowIndices.split( "," );
+ for ( String rowIndex : rowIndicesSplitted )
+ {
+ HBaseUserRowIndex index = new HBaseUserRowIndex();
+ String oid = schemaManager.getAttributeTypeRegistry().getOidByName( rowIndex );
+ index.setAttributeId( oid );
+ index.setStore( store );
+ store.addIndex( index );
+ }
+
+ store.setSuffixDn( suffixDn.getName() );
+ store.setCacheSize( 100 );
+ String tablePrefix = context.getConfiguration().get( TABLE_PREFIX );
+ store.setTablePrefix( tablePrefix );
+ store.init( schemaManager );
+ store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false );
+ store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, true );
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ throw new IOException( e );
+ }
+ }
+
+
+ protected void cleanup( Mapper<ImmutableBytesWritable, Result, IntWritable, Text>.Context context )
+ throws IOException, InterruptedException
+ {
+ try
+ {
+ store.destroy();
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ throw new IOException( e );
+ }
+ }
+
+
+ @Override
+ public void map( ImmutableBytesWritable key, Result result, Context context ) throws IOException,
+ InterruptedException
+ {
+ try
+ {
+ // write to tree table
+ Long id = Bytes.toLong( key.get() );
+ ServerEntry entry = store.getMasterTable().convertToServerEntry( id, result );
+ store.getMasterTable().addToTree( id, entry );
+
+ // write index tables
+ for ( EntryAttribute attribute : entry )
+ {
+ String attr = attribute.getId();
+ String oid = store.getSchemaManager().getAttributeTypeRegistry().getOidByName( attr );
+ if ( store.hasUserIndexOn( oid ) )
+ {
+ HBaseUserIndex<HBaseIndexTable> index = store.getUserIndex( oid );
+ for ( Value<?> value : attribute )
+ {
+ index.add( value.getBytes(), id );
+ }
+ store.getPresenceIndex().add( oid, id );
+ }
+ }
+
+ context.getCounter( Counters.INDEXED_ENTRIES ).increment( 1 );
+ }
+ catch ( Exception e )
+ {
+ System.err.println( "Error indexing entry id=" + Bytes.toLong( key.get() ) );
+ System.err.println( ">>>" + result + "<<<" );
+ e.printStackTrace();
+ }
+ }
+
+}
Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import javax.naming.NamingException;
+
+import org.apache.directory.server.constants.ServerDNConstants;
+import org.apache.directory.server.core.entry.DefaultServerEntry;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.csn.CsnFactory;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+import org.apache.directory.shared.ldap.ldif.LdifReader;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader;
+import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager;
+import org.apache.directory.shared.ldap.util.DateUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Mapper that imports LDIF.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifImporter extends Mapper<Object, Text, IntWritable, Text>
+{
+
+ private static enum Counters
+ {
+ IMPORTED_ENTRIES
+ }
+
+ public static final String NAME_COMPONENT_COUNT = "org.apache.directory.name.component.count";
+ public static final String SUFFIX = "org.apache.directory.suffix";
+ public static final String TABLE_PREFIX = "org.apache.directory.table.prefix";
+
+ private static final CsnFactory CSN_FACTORY = new CsnFactory( 0 );
+ private LdifReader ldifReader;
+ private HBaseStore store;
+ private int count;
+
+
+ protected void setup( Mapper<Object, Text, IntWritable, Text>.Context context ) throws IOException,
+ InterruptedException
+ {
+ try
+ {
+ String countAsString = context.getConfiguration().get( NAME_COMPONENT_COUNT, "0" );
+ count = Integer.parseInt( countAsString );
+
+ ldifReader = new LdifReader();
+
+ JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader();
+ SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader );
+ schemaManager.loadAllEnabled();
+
+ LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) );
+ suffixDn.normalize( schemaManager.getNormalizerMapping() );
+
+ store = new HBaseStore();
+ store.setSuffixDn( suffixDn.getName() );
+ store.setCacheSize( 100 );
+ store.setTablePrefix( context.getConfiguration().get( TABLE_PREFIX ) );
+ store.init( schemaManager );
+ store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false );
+ store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, false );
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ throw new IOException( e );
+ }
+ }
+
+
+ protected void cleanup( Mapper<Object, Text, IntWritable, Text>.Context context ) throws IOException,
+ InterruptedException
+ {
+ try
+ {
+ store.destroy();
+ ldifReader.close();
+ }
+ catch ( Throwable e )
+ {
+ e.printStackTrace();
+ throw new IOException( e );
+ }
+ }
+
+
+ @Override
+ public void map( Object key, Text value, Context context ) throws IOException, InterruptedException
+ {
+ String record = value.toString();
+ // prepend the version
+ //record = "version: 1\n\n" + record;
+
+ try
+ {
+ List<LdifEntry> ldifEntries = ldifReader.parseLdif( record );
+ for ( LdifEntry ldifEntry : ldifEntries )
+ {
+ if ( ldifEntry.isEntry() )
+ {
+ LdapDN dn = ldifEntry.getDn();
+ int size = dn.size();
+ if ( size == count )
+ {
+ importLdifEntry( ldifEntry, context );
+ }
+ }
+ }
+ }
+ catch ( NamingException e )
+ {
+ System.err.println( "Error parsing LDIF: " );
+ System.err.println( ">>>" + record + "<<<" );
+ e.printStackTrace();
+ }
+ }
+
+
+ private void importLdifEntry( LdifEntry ldifEntry, Context context ) throws IOException
+ {
+ try
+ {
+ // convert LDIF entry to server entry
+ ServerEntry entry = new DefaultServerEntry( store.getSchemaManager(), ldifEntry.getEntry() );
+
+ // add operational attributes
+ entry.put( SchemaConstants.ENTRY_UUID_AT, UUID.randomUUID().toString() );
+ entry.put( SchemaConstants.ENTRY_CSN_AT, CSN_FACTORY.newInstance().toString() );
+ entry.put( SchemaConstants.CREATORS_NAME_AT, ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
+ entry.put( SchemaConstants.CREATE_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+
+ // write to HBase
+ store.getMasterTable().add( entry );
+
+ context.getCounter( Counters.IMPORTED_ENTRIES ).increment( 1 );
+ }
+ catch ( Throwable e )
+ {
+ System.err.println( "Error importing Entry: " );
+ System.err.println( ">>>" + ldifEntry + "<<<" );
+ e.printStackTrace();
+ throw new IOException( e );
+ }
+ }
+
+}
Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+
+/**
+ * An {@link InputFormat} for LDIF files. Files are broken into LDIF records.
+ * Either linefeed or carriage-return are used to signal end of line. Keys are
+ * the position in the file, and values are the LDIF records.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifInputFormat extends TextInputFormat
+{
+
+ @Override
+ public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context )
+ {
+ return new LdifRecordReader();
+ }
+
+}
Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+
+
+/**
+ * Treats keys as offset in file and value as LDIF record.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifRecordReader extends RecordReader<LongWritable, Text>
+{
+ private static final Log LOG = LogFactory.getLog( LdifRecordReader.class );
+
+ private CompressionCodecFactory compressionCodecs = null;
+ private long start;
+ private long pos;
+ private long end;
+ private LineReader in;
+
+ private LongWritable key = null;
+ private Text value = null;
+
+ private String dn = null;
+
+
+ public void initialize( InputSplit genericSplit, TaskAttemptContext context ) throws IOException
+ {
+ FileSplit split = ( FileSplit ) genericSplit;
+ Configuration job = context.getConfiguration();
+ start = split.getStart();
+ end = start + split.getLength();
+ final Path file = split.getPath();
+ compressionCodecs = new CompressionCodecFactory( job );
+ final CompressionCodec codec = compressionCodecs.getCodec( file );
+
+ // open the file and seek to the start of the split
+ FileSystem fs = file.getFileSystem( job );
+ FSDataInputStream fileIn = fs.open( split.getPath() );
+ boolean skipFirstLine = false;
+ if ( codec != null )
+ {
+ in = new LineReader( codec.createInputStream( fileIn ), job );
+ end = Long.MAX_VALUE;
+ }
+ else
+ {
+ if ( start != 0 )
+ {
+ skipFirstLine = true;
+ --start;
+ fileIn.seek( start );
+ }
+ in = new LineReader( fileIn, job );
+ }
+ if ( skipFirstLine )
+ { // skip first line and re-establish "start".
+ start += in.readLine( new Text(), 0, ( int ) Math.min( ( long ) Integer.MAX_VALUE, end - start ) );
+ }
+ this.pos = start;
+
+ LOG.debug( "LdifRecordReader: start=" + start + ", end=" + end );
+ }
+
+
+ public boolean nextKeyValue() throws IOException
+ {
+ if ( key == null )
+ {
+ key = new LongWritable();
+ }
+ key.set( pos );
+
+ if ( value == null )
+ {
+ value = new Text();
+ }
+ value.clear();
+
+ boolean withinRecord = false;
+ StringBuilder sb = new StringBuilder();
+ while ( pos < end || withinRecord )
+ {
+ Text temp = new Text();
+ int size = in.readLine( temp );
+ if ( size == 0 )
+ {
+ // end of file
+ break;
+ }
+
+ String line = temp.toString();
+ pos += size;
+
+ // record must start with "dn:"
+ if ( !withinRecord )
+ {
+ if ( line.startsWith( "dn:" ) )
+ {
+ key.set( pos - size );
+ withinRecord = true;
+
+ if ( dn == null )
+ {
+ LOG.debug( "First record at pos " + key + ": " + line );
+ }
+ dn = line;
+ }
+ else
+ {
+ continue;
+ }
+ }
+
+ sb.append( line );
+ sb.append( '\n' );
+
+ // record ends with an empty line
+ if ( line.trim().isEmpty() && !line.startsWith( " " ) )
+ {
+ withinRecord = false;
+ break;
+ }
+ }
+ if ( sb.length() == 0 )
+ {
+ LOG.debug( "Last record at pos " + key + ": " + dn );
+ key = null;
+ value = null;
+ return false;
+ }
+ else
+ {
+ value.set( sb.toString() );
+ return true;
+ }
+ }
+
+
+ @Override
+ public LongWritable getCurrentKey()
+ {
+ return key;
+ }
+
+
+ @Override
+ public Text getCurrentValue()
+ {
+ return value;
+ }
+
+
+ /**
+ * Get the progress within the split
+ */
+ public float getProgress()
+ {
+ if ( start == end )
+ {
+ return 0.0f;
+ }
+ else
+ {
+ return Math.min( 1.0f, ( pos - start ) / ( float ) ( end - start ) );
+ }
+ }
+
+
+ public synchronized void close() throws IOException
+ {
+ LOG.debug( "Close record at pos " + key + ": " + dn );
+ if ( in != null )
+ {
+ in.close();
+ }
+ }
+}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java Sun Feb 21 22:59:19 2010
@@ -25,6 +25,7 @@
import java.util.NavigableMap;
import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.schema.AttributeType;
import org.apache.directory.shared.ldap.schema.SchemaManager;
import org.apache.directory.shared.ldap.util.Base64;
import org.apache.directory.shared.ldap.util.ByteBuffer;
@@ -64,13 +65,49 @@
public int count( Object value ) throws Exception
{
- byte[] countKey = getCountKey( value );
+ byte[] countKey = getEqualsKey( value );
Info info = fetchInfo( countKey );
if ( info == null )
{
return 0;
}
- return info.count.intValue();
+ return info.candidates.size();
+ }
+
+
+ /**
+ * Gets the equals key.
+ * The key has the following syntax:
+ * <pre> =value</pre>
+ * where <code>value</code> is the normalized value.
+ *
+ * @param value the value
+ * @return the count row key for the value
+ * @throws Exception
+ */
+ private byte[] getEqualsKey( Object value ) throws Exception
+ {
+ if ( value == null )
+ {
+ return null;
+ }
+
+ ByteBuffer bb = new ByteBuffer();
+
+ bb.append( '=' );
+
+ byte[] normValue = getNormalized( value );
+ bb.append( normValue );
+
+ return bb.copyOfUsedBytes();
+ }
+
+
+ public Object extractValueFromEqualsKey( byte[] row ) throws Exception
+ {
+ byte[] value = Bytes.tail( row, row.length - 1 );
+ AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
+ return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
}
@@ -78,7 +115,7 @@
{
ByteBuffer bb = new ByteBuffer();
- bb.append( '#' );
+ bb.append( '=' );
// add value
// there are special values to support attribute scan
@@ -112,8 +149,8 @@
public List<Long> getColumnCandidates( Object value ) throws Exception
{
- byte[] countKey = getCountKey( value );
- Info info = fetchInfo( countKey );
+ byte[] equalsKey = getEqualsKey( value );
+ Info info = fetchInfo( equalsKey );
if ( info == null )
{
return null;
@@ -140,8 +177,8 @@
*/
public boolean exists( Object value, Long id ) throws Exception
{
- byte[] countKey = getCountKey( value );
- Info info = fetchInfo( countKey );
+ byte[] equalsKey = getEqualsKey( value );
+ Info info = fetchInfo( equalsKey );
if ( info == null )
{
return false;
@@ -180,7 +217,7 @@
String key = String.valueOf( Base64.encode( row ) );
Info info = new Info();
- info.value = getValueFromCountKey( row );
+ info.value = extractValueFromEqualsKey( row );
infoCache.put( key, info );
if ( result.isEmpty() )
@@ -196,10 +233,6 @@
{
info.candidates.add( Bytes.toLong( qualifier ) );
}
- else if ( Bytes.equals( COUNT_QUALIFIER, qualifier ) )
- {
- info.count = Bytes.toLong( result.getFamilyMap( INFO_FAMILY ).get( qualifier ) );
- }
}
return info;
}
@@ -207,23 +240,11 @@
public void add( byte[] value, Long id ) throws Exception
{
- // exact match (attribute=value): #value -> count, id
- // check first if the index already exists because we won't increment the index count
- byte[] exactCountRow = getCountKey( value );
- Get exactGet = new Get( exactCountRow );
- exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
- if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
- {
- // get+put+put is not atomic!
- Put exactPut = new Put( exactCountRow );
- //exactPut.setWriteToWAL( false );
- exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) );
- HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
-
- // increment exact match count: #value -> count
- HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
- COUNT_QUALIFIER );
- }
+ // exact match (attribute=value): =value -> id
+ byte[] equalsKey = getEqualsKey( value );
+ Put exactPut = new Put( equalsKey );
+ exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) );
+ HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
// TODO: optimize - don't need to clear the áºhole cache
infoCache.clear();
@@ -234,22 +255,11 @@
public void drop( byte[] value, Long id ) throws Exception
{
- // exact match (attribute=value): #value -> count, id
- // check first if the index exists because we won't decrement the index count otherwise
- byte[] exactCountRow = getCountKey( value );
- Get exactGet = new Get( exactCountRow );
- exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
- if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
- {
- Delete exactDel = new Delete( exactCountRow );
- exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) );
- HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
-
- // decrement exact match count: #value -> count
- HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
- COUNT_QUALIFIER );
- // TODO: delete column if count is 0?
- }
+ // exact match (attribute=value): =value -> id
+ byte[] equalsKey = getEqualsKey( value );
+ Delete exactDel = new Delete( equalsKey );
+ exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) );
+ HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
// TODO: optimize - don't need to clear the áºhole cache
infoCache.clear();
@@ -260,7 +270,6 @@
class Info
{
Object value;
- Long count = 0L;
List<Long> candidates = new ArrayList<Long>();
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java Sun Feb 21 22:59:19 2010
@@ -36,7 +36,6 @@
public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
- public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
public static final byte[] VALUE_SCAN_START = new byte[]
{ 0x00 };
public static final byte[] VALUE_SCAN_STOP = new byte[]
@@ -66,4 +65,10 @@
public abstract ResultScanner getScanner( Scan scan ) throws Exception;
+
+ public abstract void add( byte[] value, Long id ) throws Exception;
+
+
+ public abstract void drop( byte[] value, Long id ) throws Exception;
+
}
\ No newline at end of file
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java Sun Feb 21 22:59:19 2010
@@ -25,9 +25,7 @@
import org.apache.directory.shared.ldap.entry.client.ClientBinaryValue;
import org.apache.directory.shared.ldap.schema.AttributeType;
import org.apache.directory.shared.ldap.schema.SchemaManager;
-import org.apache.directory.shared.ldap.util.ByteBuffer;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@@ -44,10 +42,10 @@
protected String attributeTypeOid;
protected String indexTableName;
- private HTablePool indexTablePool;
+ private HBaseTablePool indexTablePool;
protected SchemaManager schemaManager;
protected HBaseConfiguration configuration;
- protected Cache<Object, Long> countCache;
+ protected Cache<Object, Integer> countCache;
protected Cache<Object, Boolean> existsCache;
@@ -59,13 +57,17 @@
this.configuration = configuration;
String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
this.indexTableName = tablePrefix + "index_" + name;
- this.countCache = new Cache<Object, Long>( cacheSize );
+ this.countCache = new Cache<Object, Integer>( cacheSize );
this.existsCache = new Cache<Object, Boolean>( cacheSize );
}
public void close() throws Exception
{
+ if ( indexTablePool != null )
+ {
+ indexTablePool.close();
+ }
}
@@ -75,12 +77,6 @@
}
- public abstract void add( byte[] value, Long id ) throws Exception;
-
-
- public abstract void drop( byte[] value, Long id ) throws Exception;
-
-
protected byte[] getNormalized( Object value ) throws Exception
{
AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
@@ -113,51 +109,15 @@
}
- protected HTablePool getIndexTablePool() throws Exception
+ protected HBaseTablePool getIndexTablePool() throws Exception
{
if ( indexTablePool == null )
{
// ensure table is created
HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
- indexTablePool = new HTablePool( configuration, 16 );
+ indexTablePool = new HBaseTablePool( indexTableName, configuration );
}
return indexTablePool;
}
-
- /**
- * Gets the count key.
- * The key has the following syntax:
- * <pre> #value</pre>
- * where <code>value</code> is the normalized value.
- *
- * @param value the value
- * @return the count row key for the value
- * @throws Exception
- */
- protected byte[] getCountKey( Object value ) throws Exception
- {
- if ( value == null )
- {
- return null;
- }
-
- ByteBuffer bb = new ByteBuffer();
-
- bb.append( '#' );
-
- byte[] normValue = getNormalized( value );
- bb.append( normValue );
-
- return bb.copyOfUsedBytes();
- }
-
-
- public Object getValueFromCountKey( byte[] row ) throws Exception
- {
- byte[] value = Bytes.tail( row, row.length - 1 );
- AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
- return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
- }
-
}
\ No newline at end of file
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java Sun Feb 21 22:59:19 2010
@@ -42,7 +42,6 @@
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -86,14 +85,16 @@
public static final String MASTER_TABLE = "master";
public static final String TREE_TABLE = "tree";
+ public static final String MAINTAIN_COUNTERS_PROPERTY = "org.apache.directory.maintain.counters";
private HBaseConfiguration configuration;
+ private boolean maintainCounters;
private SchemaManager schemaManager;
private LdapDN suffixDn;
- private HTablePool masterTablePool;
+ private HBaseTablePool masterTablePool;
private String masterTableName;
- private HTablePool treeTablePool;
+ private HBaseTablePool treeTablePool;
private String treeTableName;
private MasterTreeInfo suffixMti;
@@ -121,6 +122,7 @@
this.schemaManager = store.getSchemaManager();
this.suffixDn = store.getSuffix();
this.configuration = store.getConfiguration();
+ this.maintainCounters = configuration.getBoolean( MAINTAIN_COUNTERS_PROPERTY, true );
this.masterTableName = store.getTablePrefix() + MASTER_TABLE;
this.treeTableName = store.getTablePrefix() + TREE_TABLE;
this.suffixMti = new MasterTreeInfo( ROOT_ID, suffixDn.getNormName(), null );
@@ -141,27 +143,40 @@
public void close() throws Exception
{
+ if ( masterTablePool != null )
+ {
+ masterTablePool.close();
+ }
+ if ( treeTablePool != null )
+ {
+ treeTablePool.close();
+ }
}
public Long add( ServerEntry entry ) throws Exception
{
+ Long id = addToMaster( entry );
+ addToTree( id, entry );
+ return id;
+ }
+
+
+ public Long addToMaster( ServerEntry entry ) throws Exception
+ {
Long id = nextId();
List<Long> parentIds = fetchParentIds( entry.getDn(), false );
String upRdn;
String normRdn;
- MasterTreeInfo treeTableKey;
if ( entry.getDn().equals( suffixDn ) )
{
upRdn = entry.getDn().getName();
normRdn = entry.getDn().getNormName();
- treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
}
else
{
upRdn = entry.getDn().getRdn().getUpName();
normRdn = entry.getDn().getRdn().getNormName();
- treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
}
// put to master and tree table
@@ -169,12 +184,9 @@
masterPut.add( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER, Bytes.toBytes( parentIds.get( 0 ) ) );
masterPut.add( TREE_INFO_FAMILY, UP_RDN_QUALIFIER, Bytes.toBytes( upRdn ) );
masterPut.add( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER, Bytes.toBytes( normRdn ) );
- Put treePut = new Put( treeTableKey.treeTableKey );
- treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
for ( EntryAttribute attribute : entry )
{
String attr = attribute.getUpId();
- String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
for ( int i = 0; i < attribute.size(); i++ )
{
Value<?> value = attribute.get( i );
@@ -184,6 +196,42 @@
// objectClass1 -> top
masterPut.add( UP_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attr ), Bytes.toBytes( i ) ), value
.getBytes() );
+ }
+ }
+ HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
+
+ return id;
+ }
+
+
+ public Long addToTree( Long id, ServerEntry entry ) throws Exception
+ {
+ List<Long> parentIds = fetchParentIds( entry.getDn(), false );
+ String upRdn;
+ String normRdn;
+ MasterTreeInfo treeTableKey;
+ if ( entry.getDn().equals( suffixDn ) )
+ {
+ upRdn = entry.getDn().getName();
+ normRdn = entry.getDn().getNormName();
+ treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+ }
+ else
+ {
+ upRdn = entry.getDn().getRdn().getUpName();
+ normRdn = entry.getDn().getRdn().getNormName();
+ treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+ }
+
+ // put to tree table
+ Put treePut = new Put( treeTableKey.treeTableKey );
+ treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+ for ( EntryAttribute attribute : entry )
+ {
+ String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
+ for ( int i = 0; i < attribute.size(); i++ )
+ {
+ Value<?> value = attribute.get( i );
// normAttributes:
// 2.5.4.0:inetorgperson -> 0
@@ -205,31 +253,33 @@
}
}
}
- HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
HBaseTableHelper.put( getTreeTablePool(), treeTableName, treePut );
- // update parent one-level count
- MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) );
- if ( parentKey != null )
+ if ( maintainCounters )
{
- HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
- ONE_LEVEL_COUNT_QUALIFIER );
- }
-
- // update all parents sub-level count
- for ( Long parentId : parentIds )
- {
- parentKey = fetchMasterTreeInfo( parentId );
+ // update parent one-level count
+ MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) );
if ( parentKey != null )
{
HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
- TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+ TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER );
}
- }
- // clear caches
- oneLevelCountCache.clear();
- subLevelCountCache.clear();
+ // update all parents sub-level count
+ for ( Long parentId : parentIds )
+ {
+ parentKey = fetchMasterTreeInfo( parentId );
+ if ( parentKey != null )
+ {
+ HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+ TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+ }
+ }
+
+ // clear caches
+ oneLevelCountCache.clear();
+ subLevelCountCache.clear();
+ }
return id;
}
@@ -247,22 +297,25 @@
Delete treeDel = new Delete( key.treeTableKey );
HBaseTableHelper.delete( getTreeTablePool(), treeTableName, treeDel );
- // update parent one-level count
- Long parentId = key.parentId;
- if ( parentId > ROOT_ID )
- {
- MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
- HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
- ONE_LEVEL_COUNT_QUALIFIER );
- }
-
- // update sub-level count of all parents
- while ( parentId > ROOT_ID )
- {
- MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
- HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
- SUB_LEVEL_COUNT_QUALIFIER );
- parentId = parentKey.parentId;
+ if ( maintainCounters )
+ {
+ // update parent one-level count
+ Long parentId = key.parentId;
+ if ( parentId > ROOT_ID )
+ {
+ MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+ HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+ TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER );
+ }
+
+ // update sub-level count of all parents
+ while ( parentId > ROOT_ID )
+ {
+ MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+ HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+ TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+ parentId = parentKey.parentId;
+ }
}
// clear caches
@@ -590,32 +643,46 @@
return fetchId( suffixMti );
}
+ private List<Long> ids = new ArrayList<Long>();
+
private long nextId() throws Exception
{
- byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW, TREE_INFO_FAMILY,
- SEQUENCE_QUALIFIER );
- return Bytes.toLong( id );
+ if ( ids.isEmpty() )
+ {
+ long amount = 100;
+ byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW,
+ TREE_INFO_FAMILY, SEQUENCE_QUALIFIER, amount );
+ long upper = Bytes.toLong( id );
+ long lower = upper - amount + 1;
+ for ( long l = lower; l <= upper; l++ )
+ {
+ ids.add( l );
+ }
+ }
+
+ Long id = ids.remove( 0 );
+ return id;
}
- private HTablePool getMasterTablePool() throws Exception
+ private HBaseTablePool getMasterTablePool() throws Exception
{
if ( masterTablePool == null )
{
HBaseTableHelper.createTable( configuration, masterTableName, TREE_INFO_FAMILY, UP_ATTRIBUTES_FAMILY );
- masterTablePool = new HTablePool( configuration, 16 );
+ masterTablePool = new HBaseTablePool( masterTableName, configuration );
}
return masterTablePool;
}
- private HTablePool getTreeTablePool() throws Exception
+ private HBaseTablePool getTreeTablePool() throws Exception
{
if ( treeTablePool == null )
{
HBaseTableHelper.createTable( configuration, treeTableName, TREE_INFO_FAMILY, NORM_ATTRIBUTES_FAMILY );
- treeTablePool = new HTablePool( configuration, 16 );
+ treeTablePool = new HBaseTablePool( treeTableName, configuration );
}
return treeTablePool;
}
@@ -697,4 +764,16 @@
}
}
+
+ public String getTreeTableName()
+ {
+ return treeTableName;
+ }
+
+
+ public String getMasterTableName()
+ {
+ return masterTableName;
+ }
+
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java Sun Feb 21 22:59:19 2010
@@ -21,13 +21,14 @@
import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor;
import org.apache.directory.shared.ldap.schema.SchemaManager;
import org.apache.directory.shared.ldap.util.Base64;
import org.apache.directory.shared.ldap.util.ByteBuffer;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -48,35 +49,35 @@
public static final byte[] COUNT_ROW = Bytes.toBytes( "!" );
public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
- public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
public static final byte[] VALUE_SCAN_FIRST_ENTRYID = new byte[]
{ 0x00 };
public static final byte[] VALUE_SCAN_LAST_ENTRYID = new byte[]
{ ( byte ) 0xFF };
protected String indexTableName;
- private HTablePool indexTablePool;
- private SchemaManager schemaManager;
- private HBaseConfiguration configuration;
+ private HBaseTablePool indexTablePool;
+ private HBaseStore store;
private String attributeTypeOid;
- private Cache<String, Long> countCache;
+ private Cache<String, Integer> countCache;
- public HBasePresenceIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix,
- HBaseConfiguration configuration, int cacheSize ) throws Exception
+ public HBasePresenceIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception
{
this.attributeTypeOid = attributeTypeOid;
- this.schemaManager = schemaManager;
- this.configuration = configuration;
- String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
- this.indexTableName = tablePrefix + "index_" + name;
- this.countCache = new Cache<String, Long>( cacheSize );
+ this.store = store;
+ String name = store.getSchemaManager().getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
+ this.indexTableName = store.getTablePrefix() + "index_" + name;
+ this.countCache = new Cache<String, Integer>( cacheSize );
}
public void close() throws Exception
{
+ if ( indexTablePool != null )
+ {
+ indexTablePool.close();
+ }
}
@@ -121,10 +122,23 @@
return countCache.get( attributeTypeOid ).intValue();
}
- Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY,
- COUNT_QUALIFIER, 0L );
+ // TODO: scan directly instead of using the cursor?
+ HBasePresenceIndexCursor cursor = new HBasePresenceIndexCursor( this );
+ int count = 0;
+ int limit = 100;
+ while ( cursor.next() && count <= limit )
+ {
+ count++;
+ }
+ if ( count >= limit )
+ {
+ // this is just a guess to avoid subtree scan
+ count = store.count() / 10;
+ }
+ cursor.close();
+
countCache.put( attributeTypeOid, count );
- return count.intValue();
+ return count;
}
@@ -153,20 +167,10 @@
public void add( Long entryId ) throws Exception
{
// presence (attribute=*): *<id> -> id
- // check first if the index already exists because we won't increment the index count
byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) );
- Get presenceGet = new Get( presenceRow );
- if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
- {
- // get+put+put is not atomic!
- Put presencePut = new Put( presenceRow );
- presencePut.setWriteToWAL( false );
- presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) );
- HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut );
-
- // increment existence count: attribute: -> count
- HBaseTableHelper.increment( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
- }
+ Put presencePut = new Put( presenceRow );
+ presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) );
+ HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut );
countCache.clear();
}
@@ -175,29 +179,21 @@
public void drop( Long entryId ) throws Exception
{
// presence (attribute=*): *<id> -> id
- // check first if the index exists because we won't decrement the index count otherwise
byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) );
- Get presenceGet = new Get( presenceRow );
- if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
- {
- Delete presenceDel = new Delete( presenceRow );
- HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel );
-
- // decrement existence count: attribute: -> count
- HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
- }
+ Delete presenceDel = new Delete( presenceRow );
+ HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel );
countCache.clear();
}
- protected HTablePool getIndexTablePool() throws Exception
+ protected HBaseTablePool getIndexTablePool() throws Exception
{
if ( indexTablePool == null )
{
// ensure table is created
- HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
- indexTablePool = new HTablePool( configuration, 16 );
+ HBaseTableHelper.createTable( store.getConfiguration(), indexTableName, INFO_FAMILY );
+ indexTablePool = new HBaseTablePool( indexTableName, store.getConfiguration() );
}
return indexTablePool;
}
Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java Sun Feb 21 22:59:19 2010
@@ -20,11 +20,13 @@
package org.apache.directory.server.core.partition.hbase.table;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
+import org.apache.directory.server.xdbm.IndexCursor;
import org.apache.directory.shared.ldap.schema.AttributeType;
-import org.apache.directory.shared.ldap.schema.SchemaManager;
import org.apache.directory.shared.ldap.util.Base64;
import org.apache.directory.shared.ldap.util.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
@@ -45,12 +47,13 @@
public class HBaseRowIndexTable extends HBaseIndexTableBase
{
private static final Logger LOG = LoggerFactory.getLogger( HBaseRowIndexTable.class );
+ private HBaseStore store;
- public HBaseRowIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix,
- HBaseConfiguration configuration, int cacheSize ) throws Exception
+ public HBaseRowIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception
{
- super( attributeTypeOid, schemaManager, tablePrefix, configuration, cacheSize );
+ super( attributeTypeOid, store.getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), cacheSize );
+ this.store = store;
}
@@ -146,15 +149,24 @@
return countCache.get( value ).intValue();
}
- byte[] row = getCountKey( value );
- if ( row == null )
+ // TODO: scan directly instead of using the cursor?
+ HBaseUserIndex<HBaseIndexTable> index = store.getUserIndex( attributeTypeOid );
+ IndexCursor<Object, ServerEntry> cursor = index.forwardCursor( value );
+ int count = 0;
+ int limit = 100;
+ while ( cursor.next() && count <= limit )
+ {
+ count++;
+ }
+ if ( count >= limit )
{
- return 0;
+ // this is just a guess to avoid subtree scan
+ count = store.count() / 10;
}
- Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, row, INFO_FAMILY,
- COUNT_QUALIFIER, 0L );
+ cursor.close();
+
countCache.put( value, count );
- return count.intValue();
+ return count;
}
@@ -181,22 +193,10 @@
public void add( byte[] value, Long id ) throws Exception
{
// exact match (attribute=value): =value<id> -> id, value
- // check first if the index already exists because we won't increment the index count
byte[] exactRow = getEqualsKey( value, id );
- Get exactGet = new Get( exactRow );
- if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
- {
- // get+put+put is not atomic!
- Put exactPut = new Put( exactRow );
- //exactPut.setWriteToWAL( false );
- exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
- HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
-
- // increment exact match count: #value -> count
- byte[] exactCountRow = getCountKey( value );
- HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
- COUNT_QUALIFIER );
- }
+ Put exactPut = new Put( exactRow );
+ exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+ HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
// TODO: optimize - don't need to clear the áºhole cache
countCache.clear();
@@ -207,20 +207,9 @@
public void drop( byte[] value, Long id ) throws Exception
{
// exact match (attribute=value): =value<id> -> id
- // check first if the index exists because we won't decrement the index count otherwise
byte[] exactRow = getEqualsKey( value, id );
- Get exactGet = new Get( exactRow );
- if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
- {
- Delete exactDel = new Delete( exactRow );
- HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
-
- // decrement exact match count: #value -> count
- byte[] exactCountRow = getCountKey( value );
- HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
- COUNT_QUALIFIER );
- // TODO: delete column if count is 0?
- }
+ Delete exactDel = new Delete( exactRow );
+ HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
// TODO: optimize - don't need to clear the áºhole cache
countCache.clear();