You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by se...@apache.org on 2010/02/21 23:59:20 UTC

svn commit: r912434 [1/2] - in /directory/sandbox/seelmann/hbase-partition/src: main/java/org/apache/directory/server/core/partition/hbase/ main/java/org/apache/directory/server/core/partition/hbase/cursor/ main/java/org/apache/directory/server/core/pa...

Author: seelmann
Date: Sun Feb 21 22:59:19 2010
New Revision: 912434

URL: http://svn.apache.org/viewvc?rev=912434&view=rev
Log:
o added Map/Reduce job for LDIF import and indexing
o added pool for HTable objects
o removed counters for indices (they don't scale)


Added:
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTablePool.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/GetPerformanceEvaluation.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/LdifImportAndIndexIT.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteLdifImport.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/it/mapreduce/RemoteRunner.java
    directory/sandbox/seelmann/hbase-partition/src/test/resources/testdata-5.ldif
Removed:
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexMapper.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/RowCounterMapper.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/SimpleMapper.java
Modified:
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java
    directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseTableHelper.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseClusterTestCaseAdapter.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseDistributedRunner.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/HBaseEmbeddedRunner.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/AbstractHBaseTableTest.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableTest.java
    directory/sandbox/seelmann/hbase-partition/src/test/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTableTest.java
    directory/sandbox/seelmann/hbase-partition/src/test/resources/log4j.properties

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/HBaseStore.java Sun Feb 21 22:59:19 2010
@@ -22,6 +22,7 @@
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -39,7 +40,7 @@
 import org.apache.directory.server.core.partition.hbase.index.HBaseSubAliasIndex;
 import org.apache.directory.server.core.partition.hbase.index.HBaseSubLevelIndex;
 import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
-import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
 import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
 import org.apache.directory.server.xdbm.Index;
 import org.apache.directory.server.xdbm.IndexCursor;
@@ -67,6 +68,8 @@
 public class HBaseStore implements Store<ServerEntry>
 {
 
+    public static final String STRONG_CONSISTENCY_PROPERTY = "org.apache.directory.strong.consistency";
+
     private String tablePrefix;
 
     private LdapDN suffixDn;
@@ -80,8 +83,8 @@
 
     private HBaseMasterTable masterTable;
 
-    private Map<String, HBaseUserIndex<? extends HBaseIndexTableBase>> userIndices = new HashMap<String, HBaseUserIndex<? extends HBaseIndexTableBase>>();
-    private Index<String, ServerEntry> presenceIndex;
+    private Map<String, HBaseUserIndex<HBaseIndexTable>> userIndices = new HashMap<String, HBaseUserIndex<HBaseIndexTable>>();
+    private HBasePresenceIndex presenceIndex;
     private Index<String, ServerEntry> ndnIndex;
     private Index<Long, ServerEntry> oneLevelIndex;
     private Index<Long, ServerEntry> subLevelIndex;
@@ -184,7 +187,7 @@
             String oid = getAttributeTypeOid( attr );
             if ( userIndices.containsKey( oid ) )
             {
-                HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+                HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
                 for ( Value<?> value : attribute )
                 {
                     index.add( value.getBytes(), id );
@@ -224,7 +227,7 @@
     @SuppressWarnings("unchecked")
     public void addIndex( Index<?, ServerEntry> index )
     {
-        this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex<? extends HBaseIndexTableBase> ) index );
+        this.userIndices.put( index.getAttributeId(), ( HBaseUserIndex<HBaseIndexTable> ) index );
     }
 
 
@@ -246,7 +249,7 @@
             String oid = getAttributeTypeOid( attr );
             if ( userIndices.containsKey( oid ) )
             {
-                HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+                HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
                 for ( Value<?> value : attribute )
                 {
                     index.drop( value.getBytes(), id );
@@ -350,7 +353,7 @@
     }
 
 
-    public Index<String, ServerEntry> getPresenceIndex()
+    public HBasePresenceIndex getPresenceIndex()
     {
         return presenceIndex;
     }
@@ -404,7 +407,7 @@
     }
 
 
-    public Index<?, ServerEntry> getUserIndex( String id ) throws IndexNotFoundException
+    public HBaseUserIndex<HBaseIndexTable> getUserIndex( String id ) throws IndexNotFoundException
     {
         id = getAttributeTypeOid( id );
 
@@ -432,7 +435,7 @@
 
     public Set<Index<?, ServerEntry>> getUserIndices()
     {
-        throw new UnsupportedOperationException();
+        return new HashSet<Index<?, ServerEntry>>( userIndices.values() );
     }
 
 
@@ -485,7 +488,7 @@
             String oid = getAttributeTypeOid( attr );
             if ( userIndices.containsKey( oid ) )
             {
-                HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+                HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
                 for ( Value<?> value : attribute )
                 {
                     index.drop( value.getBytes(), id );
@@ -554,7 +557,7 @@
             String oid = getAttributeTypeOid( attr );
             if ( userIndices.containsKey( oid ) )
             {
-                HBaseUserIndex<? extends HBaseIndexTableBase> index = userIndices.get( oid );
+                HBaseUserIndex<? extends HBaseIndexTable> index = userIndices.get( oid );
                 for ( Value<?> value : attribute )
                 {
                     index.add( value.getBytes(), id );
@@ -645,7 +648,7 @@
 
     public void setPresenceIndex( Index<String, ServerEntry> presenceIndex ) throws Exception
     {
-        this.presenceIndex = presenceIndex;
+        this.presenceIndex = ( HBasePresenceIndex ) presenceIndex;
     }
 
 

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/cursor/HBaseUserColumnIndexCursor.java Sun Feb 21 22:59:19 2010
@@ -168,30 +168,30 @@
                 {
                     /*
                      * this case is relevant for substring filters with an initial pattern, e.g. (cn=test*)
-                     * - row filter is "^#test.*$" 
+                     * - row filter is "^=test.*$" 
                      */
                     start = indexTable.getScanKey( node.getInitial(), null );
                     stop = indexTable.getScanKey( node.getInitial(), HBaseIndexTable.VALUE_SCAN_STOP );
-                    rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" );
+                    rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" );
                 }
                 else
                 {
                     /*
                      * this case is relevant for substring filters w/o an initial pattern, e.g. (cn=*test*) 
                      * unfortunately we need to scan the whole index, but can set a row filter
-                     * - row filter is "^#.*test.*$" 
+                     * - row filter is "^=.*test.*$" 
                      */
                     start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null );
                     stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
-                    rowFilterPattern = Pattern.compile( "^#" + Utils.getValuePattern( node, store ) + "$" );
+                    rowFilterPattern = Pattern.compile( "^=" + Utils.getValuePattern( node, store ) + "$" );
                 }
             }
             else if ( value != null )
             {
                 /*
                  * this case is relevant for greater than filters (the start value is set by before(IndexEntry)), e.g. (cn>=test)
-                 * - start row is "#test"
-                 * - stop row is "#0xFF"
+                 * - start row is "=test"
+                 * - stop row is "=0xFF"
                  */
                 start = indexTable.getScanKey( value, null );
                 stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
@@ -203,8 +203,8 @@
                 /*
                  * this case is relevant for less than filters, e.g. (cn<=test)
                  * unfortunately we need to scan the whole index
-                 * - start row is "#0x00"
-                 * - stop row is "#0xFF"
+                 * - start row is "=0x00"
+                 * - stop row is "=0xFF"
                  */
                 start = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_START, null );
                 stop = indexTable.getScanKey( HBaseIndexTable.FULL_SCAN_STOP, null );
@@ -234,7 +234,7 @@
         if ( iterator.hasNext() )
         {
             Result result = iterator.next();
-            value = indexTable.getValueFromCountKey( result.getRow() );
+            value = indexTable.extractValueFromEqualsKey( result.getRow() );
             candidates = indexTable.getColumnCandidates( result ).iterator();
         }
         else

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBasePresenceIndex.java Sun Feb 21 22:59:19 2010
@@ -27,7 +27,6 @@
 import org.apache.directory.server.core.partition.hbase.HBaseStore;
 import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor;
 import org.apache.directory.server.core.partition.hbase.table.HBasePresenceIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
 
 
 /**
@@ -80,7 +79,7 @@
 
 
     @Override
-    public IndexCursor<String, ServerEntry> forwardCursor( String key ) throws Exception
+    public HBasePresenceIndexCursor forwardCursor( String key ) throws Exception
     {
         return new HBasePresenceIndexCursor( getPresenceIndexTable( key ) );
     }
@@ -108,8 +107,8 @@
         }
         else
         {
-            HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store
-                .getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), getCacheSize() );
+            HBasePresenceIndexTable presenceIndexTable = new HBasePresenceIndexTable( attributeTypeOid, store,
+                getCacheSize() );
             presenceIndexTables.put( attributeTypeOid, presenceIndexTable );
             return presenceIndexTable;
         }

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserColumnIndex.java Sun Feb 21 22:59:19 2010
@@ -20,10 +20,8 @@
 package org.apache.directory.server.core.partition.hbase.index;
 
 
-import org.apache.directory.server.core.entry.ServerEntry;
 import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserColumnIndexCursor;
 import org.apache.directory.server.core.partition.hbase.table.HBaseColumnIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
 import org.apache.directory.shared.ldap.filter.SubstringNode;
 
 
@@ -58,27 +56,27 @@
 
 
     @Override
-    public IndexCursor<Object, ServerEntry> forwardCursor( Object value ) throws Exception
+    public HBaseUserColumnIndexCursor forwardCursor( Object value ) throws Exception
     {
         return new HBaseUserColumnIndexCursor( getAttributeId(), value, getIndexTable(), store );
     }
 
 
     @Override
-    public IndexCursor<Object, ServerEntry> forwardCursor() throws Exception
+    public HBaseUserColumnIndexCursor forwardCursor() throws Exception
     {
         return new HBaseUserColumnIndexCursor( getAttributeId(), getIndexTable(), store );
     }
 
 
-    public IndexCursor<Object, ServerEntry> forwardSubstringCursor( SubstringNode node ) throws Exception
+    public HBaseUserColumnIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception
     {
         return new HBaseUserColumnIndexCursor( getAttributeId(), node, getIndexTable(), store );
     }
 
 
     @Override
-    protected HBaseColumnIndexTable getIndexTable() throws Exception
+    public HBaseColumnIndexTable getIndexTable() throws Exception
     {
         if ( indexTable == null )
         {

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserIndex.java Sun Feb 21 22:59:19 2010
@@ -24,7 +24,7 @@
 
 import org.apache.directory.server.core.entry.ServerEntry;
 import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserIndexReverseCursor;
-import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTableBase;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
 import org.apache.directory.server.core.partition.hbase.xdbmext.IndexSubstringExtension;
 import org.apache.directory.server.xdbm.IndexCursor;
 import org.apache.directory.shared.ldap.filter.SubstringNode;
@@ -37,7 +37,7 @@
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  * @version $Rev$, $Date$
  */
-public abstract class HBaseUserIndex<T extends HBaseIndexTableBase> extends AbstractHBaseIndex<Object, ServerEntry>
+public abstract class HBaseUserIndex<T extends HBaseIndexTable> extends AbstractHBaseIndex<Object, ServerEntry>
     implements IndexSubstringExtension<Object, ServerEntry>
 {
 
@@ -217,6 +217,6 @@
     }
 
 
-    protected abstract T getIndexTable() throws Exception;
+    public abstract T getIndexTable() throws Exception;
 
 }

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/index/HBaseUserRowIndex.java Sun Feb 21 22:59:19 2010
@@ -20,10 +20,8 @@
 package org.apache.directory.server.core.partition.hbase.index;
 
 
-import org.apache.directory.server.core.entry.ServerEntry;
 import org.apache.directory.server.core.partition.hbase.cursor.HBaseUserRowIndexCursor;
 import org.apache.directory.server.core.partition.hbase.table.HBaseRowIndexTable;
-import org.apache.directory.server.xdbm.IndexCursor;
 import org.apache.directory.shared.ldap.filter.SubstringNode;
 
 
@@ -58,32 +56,31 @@
 
 
     @Override
-    public IndexCursor<Object, ServerEntry> forwardCursor( Object value ) throws Exception
+    public HBaseUserRowIndexCursor forwardCursor( Object value ) throws Exception
     {
         return new HBaseUserRowIndexCursor( getAttributeId(), value, getIndexTable(), store );
     }
 
 
     @Override
-    public IndexCursor<Object, ServerEntry> forwardCursor() throws Exception
+    public HBaseUserRowIndexCursor forwardCursor() throws Exception
     {
         return new HBaseUserRowIndexCursor( getAttributeId(), getIndexTable(), store );
     }
 
 
-    public IndexCursor<Object, ServerEntry> forwardSubstringCursor( SubstringNode node ) throws Exception
+    public HBaseUserRowIndexCursor forwardSubstringCursor( SubstringNode node ) throws Exception
     {
         return new HBaseUserRowIndexCursor( getAttributeId(), node, getIndexTable(), store );
     }
 
 
     @Override
-    protected HBaseRowIndexTable getIndexTable() throws Exception
+    public HBaseRowIndexTable getIndexTable() throws Exception
     {
         if ( indexTable == null )
         {
-            indexTable = new HBaseRowIndexTable( getAttributeId(), store.getSchemaManager(), store.getTablePrefix(),
-                store.getConfiguration(), getCacheSize() );
+            indexTable = new HBaseRowIndexTable( getAttributeId(), store, getCacheSize() );
         }
         return indexTable;
     }

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/IndexBuilder.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,173 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserColumnIndex;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserRowIndex;
+import org.apache.directory.server.core.partition.hbase.table.HBaseIndexTable;
+import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
+import org.apache.directory.shared.ldap.entry.EntryAttribute;
+import org.apache.directory.shared.ldap.entry.Value;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader;
+import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Mapper that build indices.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class IndexBuilder extends TableMapper<IntWritable, Text>
+{
+
+    private static enum Counters
+    {
+        INDEXED_ENTRIES
+    }
+
+    public static final String COLUMN_INDICES = "org.apache.directory.column.indices";
+    public static final String ROW_INDICES = "org.apache.directory.row.indices";
+    public static final String SUFFIX = "org.apache.directory.suffix";
+    public static final String TABLE_PREFIX = "org.apache.directory.table.prefix";
+
+    private HBaseStore store;
+
+
+    protected void setup( Mapper<ImmutableBytesWritable, Result, IntWritable, Text>.Context context )
+        throws IOException, InterruptedException
+    {
+        try
+        {
+            JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader();
+            SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader );
+            schemaManager.loadAllEnabled();
+
+            LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) );
+            suffixDn.normalize( schemaManager.getNormalizerMapping() );
+
+            store = new HBaseStore();
+
+            String columnIndices = context.getConfiguration().get( COLUMN_INDICES );
+            String[] columnIndicesSplitted = columnIndices.split( "," );
+            for ( String columnIndex : columnIndicesSplitted )
+            {
+                HBaseUserColumnIndex index = new HBaseUserColumnIndex();
+                String oid = schemaManager.getAttributeTypeRegistry().getOidByName( columnIndex );
+                index.setAttributeId( oid );
+                index.setStore( store );
+                store.addIndex( index );
+
+            }
+            String rowIndices = context.getConfiguration().get( ROW_INDICES );
+            String[] rowIndicesSplitted = rowIndices.split( "," );
+            for ( String rowIndex : rowIndicesSplitted )
+            {
+                HBaseUserRowIndex index = new HBaseUserRowIndex();
+                String oid = schemaManager.getAttributeTypeRegistry().getOidByName( rowIndex );
+                index.setAttributeId( oid );
+                index.setStore( store );
+                store.addIndex( index );
+            }
+
+            store.setSuffixDn( suffixDn.getName() );
+            store.setCacheSize( 100 );
+            String tablePrefix = context.getConfiguration().get( TABLE_PREFIX );
+            store.setTablePrefix( tablePrefix );
+            store.init( schemaManager );
+            store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false );
+            store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, true );
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+            throw new IOException( e );
+        }
+    }
+
+
+    protected void cleanup( Mapper<ImmutableBytesWritable, Result, IntWritable, Text>.Context context )
+        throws IOException, InterruptedException
+    {
+        try
+        {
+            store.destroy();
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+            throw new IOException( e );
+        }
+    }
+
+
+    @Override
+    public void map( ImmutableBytesWritable key, Result result, Context context ) throws IOException,
+        InterruptedException
+    {
+        try
+        {
+            // write to tree table
+            Long id = Bytes.toLong( key.get() );
+            ServerEntry entry = store.getMasterTable().convertToServerEntry( id, result );
+            store.getMasterTable().addToTree( id, entry );
+
+            // write index tables
+            for ( EntryAttribute attribute : entry )
+            {
+                String attr = attribute.getId();
+                String oid = store.getSchemaManager().getAttributeTypeRegistry().getOidByName( attr );
+                if ( store.hasUserIndexOn( oid ) )
+                {
+                    HBaseUserIndex<HBaseIndexTable> index = store.getUserIndex( oid );
+                    for ( Value<?> value : attribute )
+                    {
+                        index.add( value.getBytes(), id );
+                    }
+                    store.getPresenceIndex().add( oid, id );
+                }
+            }
+
+            context.getCounter( Counters.INDEXED_ENTRIES ).increment( 1 );
+        }
+        catch ( Exception e )
+        {
+            System.err.println( "Error indexing entry id=" + Bytes.toLong( key.get() ) );
+            System.err.println( ">>>" + result + "<<<" );
+            e.printStackTrace();
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifImporter.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,180 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+import javax.naming.NamingException;
+
+import org.apache.directory.server.constants.ServerDNConstants;
+import org.apache.directory.server.core.entry.DefaultServerEntry;
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.table.HBaseMasterTable;
+import org.apache.directory.shared.ldap.constants.SchemaConstants;
+import org.apache.directory.shared.ldap.csn.CsnFactory;
+import org.apache.directory.shared.ldap.ldif.LdifEntry;
+import org.apache.directory.shared.ldap.ldif.LdifReader;
+import org.apache.directory.shared.ldap.name.LdapDN;
+import org.apache.directory.shared.ldap.schema.SchemaManager;
+import org.apache.directory.shared.ldap.schema.loader.ldif.JarLdifSchemaLoader;
+import org.apache.directory.shared.ldap.schema.manager.impl.DefaultSchemaManager;
+import org.apache.directory.shared.ldap.util.DateUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+
+
+/**
+ * Mapper that imports LDIF.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifImporter extends Mapper<Object, Text, IntWritable, Text>
+{
+
+    private static enum Counters
+    {
+        IMPORTED_ENTRIES
+    }
+
+    public static final String NAME_COMPONENT_COUNT = "org.apache.directory.name.component.count";
+    public static final String SUFFIX = "org.apache.directory.suffix";
+    public static final String TABLE_PREFIX = "org.apache.directory.table.prefix";
+
+    private static final CsnFactory CSN_FACTORY = new CsnFactory( 0 );
+    private LdifReader ldifReader;
+    private HBaseStore store;
+    private int count;
+
+
+    protected void setup( Mapper<Object, Text, IntWritable, Text>.Context context ) throws IOException,
+        InterruptedException
+    {
+        try
+        {
+            String countAsString = context.getConfiguration().get( NAME_COMPONENT_COUNT, "0" );
+            count = Integer.parseInt( countAsString );
+
+            ldifReader = new LdifReader();
+
+            JarLdifSchemaLoader schemaLoader = new JarLdifSchemaLoader();
+            SchemaManager schemaManager = new DefaultSchemaManager( schemaLoader );
+            schemaManager.loadAllEnabled();
+
+            LdapDN suffixDn = new LdapDN( context.getConfiguration().get( SUFFIX ) );
+            suffixDn.normalize( schemaManager.getNormalizerMapping() );
+
+            store = new HBaseStore();
+            store.setSuffixDn( suffixDn.getName() );
+            store.setCacheSize( 100 );
+            store.setTablePrefix( context.getConfiguration().get( TABLE_PREFIX ) );
+            store.init( schemaManager );
+            store.getConfiguration().setBoolean( HBaseStore.STRONG_CONSISTENCY_PROPERTY, false );
+            store.getConfiguration().setBoolean( HBaseMasterTable.MAINTAIN_COUNTERS_PROPERTY, false );
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+            throw new IOException( e );
+        }
+    }
+
+
+    protected void cleanup( Mapper<Object, Text, IntWritable, Text>.Context context ) throws IOException,
+        InterruptedException
+    {
+        try
+        {
+            store.destroy();
+            ldifReader.close();
+        }
+        catch ( Throwable e )
+        {
+            e.printStackTrace();
+            throw new IOException( e );
+        }
+    }
+
+
+    @Override
+    public void map( Object key, Text value, Context context ) throws IOException, InterruptedException
+    {
+        String record = value.toString();
+        // prepend the version
+        //record = "version: 1\n\n" + record;
+
+        try
+        {
+            List<LdifEntry> ldifEntries = ldifReader.parseLdif( record );
+            for ( LdifEntry ldifEntry : ldifEntries )
+            {
+                if ( ldifEntry.isEntry() )
+                {
+                    LdapDN dn = ldifEntry.getDn();
+                    int size = dn.size();
+                    if ( size == count )
+                    {
+                        importLdifEntry( ldifEntry, context );
+                    }
+                }
+            }
+        }
+        catch ( NamingException e )
+        {
+            System.err.println( "Error parsing LDIF: " );
+            System.err.println( ">>>" + record + "<<<" );
+            e.printStackTrace();
+        }
+    }
+
+
+    private void importLdifEntry( LdifEntry ldifEntry, Context context ) throws IOException
+    {
+        try
+        {
+            // convert LDIF entry to server entry
+            ServerEntry entry = new DefaultServerEntry( store.getSchemaManager(), ldifEntry.getEntry() );
+
+            // add operational attributes
+            entry.put( SchemaConstants.ENTRY_UUID_AT, UUID.randomUUID().toString() );
+            entry.put( SchemaConstants.ENTRY_CSN_AT, CSN_FACTORY.newInstance().toString() );
+            entry.put( SchemaConstants.CREATORS_NAME_AT, ServerDNConstants.ADMIN_SYSTEM_DN_NORMALIZED );
+            entry.put( SchemaConstants.CREATE_TIMESTAMP_AT, DateUtils.getGeneralizedTime() );
+
+            // write to HBase
+            store.getMasterTable().add( entry );
+
+            context.getCounter( Counters.IMPORTED_ENTRIES ).increment( 1 );
+        }
+        catch ( Throwable e )
+        {
+            System.err.println( "Error importing Entry: " );
+            System.err.println( ">>>" + ldifEntry + "<<<" );
+            e.printStackTrace();
+            throw new IOException( e );
+        }
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifInputFormat.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,49 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+
+
+/**
+ * An {@link InputFormat} for LDIF files. Files are broken into LDIF records.
+ * Either linefeed or carriage-return are used to signal end of line. Keys are
+ * the position in the file, and values are the LDIF records.
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifInputFormat extends TextInputFormat
+{
+
+    @Override
+    public RecordReader<LongWritable, Text> createRecordReader( InputSplit split, TaskAttemptContext context )
+    {
+        return new LdifRecordReader();
+    }
+
+}

Added: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java?rev=912434&view=auto
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java (added)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/mapreduce/LdifRecordReader.java Sun Feb 21 22:59:19 2010
@@ -0,0 +1,216 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing,
+ *   software distributed under the License is distributed on an
+ *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *   KIND, either express or implied.  See the License for the
+ *   specific language governing permissions and limitations
+ *   under the License.
+ *
+ */
+
+package org.apache.directory.server.core.partition.hbase.mapreduce;
+
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.util.LineReader;
+
+
+/**
+ * Treats keys as offset in file and value as LDIF record. 
+ *
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class LdifRecordReader extends RecordReader<LongWritable, Text>
+{
+    private static final Log LOG = LogFactory.getLog( LdifRecordReader.class );
+
+    private CompressionCodecFactory compressionCodecs = null;
+    private long start;
+    private long pos;
+    private long end;
+    private LineReader in;
+
+    private LongWritable key = null;
+    private Text value = null;
+
+    private String dn = null;
+
+
+    public void initialize( InputSplit genericSplit, TaskAttemptContext context ) throws IOException
+    {
+        FileSplit split = ( FileSplit ) genericSplit;
+        Configuration job = context.getConfiguration();
+        start = split.getStart();
+        end = start + split.getLength();
+        final Path file = split.getPath();
+        compressionCodecs = new CompressionCodecFactory( job );
+        final CompressionCodec codec = compressionCodecs.getCodec( file );
+
+        // open the file and seek to the start of the split
+        FileSystem fs = file.getFileSystem( job );
+        FSDataInputStream fileIn = fs.open( split.getPath() );
+        boolean skipFirstLine = false;
+        if ( codec != null )
+        {
+            in = new LineReader( codec.createInputStream( fileIn ), job );
+            end = Long.MAX_VALUE;
+        }
+        else
+        {
+            if ( start != 0 )
+            {
+                skipFirstLine = true;
+                --start;
+                fileIn.seek( start );
+            }
+            in = new LineReader( fileIn, job );
+        }
+        if ( skipFirstLine )
+        { // skip first line and re-establish "start".
+            start += in.readLine( new Text(), 0, ( int ) Math.min( ( long ) Integer.MAX_VALUE, end - start ) );
+        }
+        this.pos = start;
+
+        LOG.debug( "LdifRecordReader: start=" + start + ", end=" + end );
+    }
+
+
+    public boolean nextKeyValue() throws IOException
+    {
+        if ( key == null )
+        {
+            key = new LongWritable();
+        }
+        key.set( pos );
+
+        if ( value == null )
+        {
+            value = new Text();
+        }
+        value.clear();
+
+        boolean withinRecord = false;
+        StringBuilder sb = new StringBuilder();
+        while ( pos < end || withinRecord )
+        {
+            Text temp = new Text();
+            int size = in.readLine( temp );
+            if ( size == 0 )
+            {
+                // end of file
+                break;
+            }
+
+            String line = temp.toString();
+            pos += size;
+
+            // record must start with "dn:"
+            if ( !withinRecord )
+            {
+                if ( line.startsWith( "dn:" ) )
+                {
+                    key.set( pos - size );
+                    withinRecord = true;
+
+                    if ( dn == null )
+                    {
+                        LOG.debug( "First record at pos " + key + ": " + line );
+                    }
+                    dn = line;
+                }
+                else
+                {
+                    continue;
+                }
+            }
+
+            sb.append( line );
+            sb.append( '\n' );
+
+            // record ends with an empty line
+            if ( line.trim().isEmpty() && !line.startsWith( " " ) )
+            {
+                withinRecord = false;
+                break;
+            }
+        }
+        if ( sb.length() == 0 )
+        {
+            LOG.debug( "Last record at pos " + key + ": " + dn );
+            key = null;
+            value = null;
+            return false;
+        }
+        else
+        {
+            value.set( sb.toString() );
+            return true;
+        }
+    }
+
+
+    @Override
+    public LongWritable getCurrentKey()
+    {
+        return key;
+    }
+
+
+    @Override
+    public Text getCurrentValue()
+    {
+        return value;
+    }
+
+
+    /**
+     * Get the progress within the split
+     */
+    public float getProgress()
+    {
+        if ( start == end )
+        {
+            return 0.0f;
+        }
+        else
+        {
+            return Math.min( 1.0f, ( pos - start ) / ( float ) ( end - start ) );
+        }
+    }
+
+
+    public synchronized void close() throws IOException
+    {
+        LOG.debug( "Close record at pos " + key + ": " + dn );
+        if ( in != null )
+        {
+            in.close();
+        }
+    }
+}

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseColumnIndexTable.java Sun Feb 21 22:59:19 2010
@@ -25,6 +25,7 @@
 import java.util.NavigableMap;
 
 import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.shared.ldap.schema.AttributeType;
 import org.apache.directory.shared.ldap.schema.SchemaManager;
 import org.apache.directory.shared.ldap.util.Base64;
 import org.apache.directory.shared.ldap.util.ByteBuffer;
@@ -64,13 +65,49 @@
 
     public int count( Object value ) throws Exception
     {
-        byte[] countKey = getCountKey( value );
+        byte[] countKey = getEqualsKey( value );
         Info info = fetchInfo( countKey );
         if ( info == null )
         {
             return 0;
         }
-        return info.count.intValue();
+        return info.candidates.size();
+    }
+
+
+    /**
+     * Gets the equals key. 
+     * The key has the following syntax:
+     *   <pre>  =value</pre>
+     * where <code>value</code> is the normalized value.
+     *
+     * @param value the value
+     * @return the count row key for the value
+     * @throws Exception
+     */
+    private byte[] getEqualsKey( Object value ) throws Exception
+    {
+        if ( value == null )
+        {
+            return null;
+        }
+
+        ByteBuffer bb = new ByteBuffer();
+
+        bb.append( '=' );
+
+        byte[] normValue = getNormalized( value );
+        bb.append( normValue );
+
+        return bb.copyOfUsedBytes();
+    }
+
+
+    public Object extractValueFromEqualsKey( byte[] row ) throws Exception
+    {
+        byte[] value = Bytes.tail( row, row.length - 1 );
+        AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
+        return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
     }
 
 
@@ -78,7 +115,7 @@
     {
         ByteBuffer bb = new ByteBuffer();
 
-        bb.append( '#' );
+        bb.append( '=' );
 
         // add value
         // there are special values to support attribute scan
@@ -112,8 +149,8 @@
 
     public List<Long> getColumnCandidates( Object value ) throws Exception
     {
-        byte[] countKey = getCountKey( value );
-        Info info = fetchInfo( countKey );
+        byte[] equalsKey = getEqualsKey( value );
+        Info info = fetchInfo( equalsKey );
         if ( info == null )
         {
             return null;
@@ -140,8 +177,8 @@
      */
     public boolean exists( Object value, Long id ) throws Exception
     {
-        byte[] countKey = getCountKey( value );
-        Info info = fetchInfo( countKey );
+        byte[] equalsKey = getEqualsKey( value );
+        Info info = fetchInfo( equalsKey );
         if ( info == null )
         {
             return false;
@@ -180,7 +217,7 @@
         String key = String.valueOf( Base64.encode( row ) );
 
         Info info = new Info();
-        info.value = getValueFromCountKey( row );
+        info.value = extractValueFromEqualsKey( row );
         infoCache.put( key, info );
 
         if ( result.isEmpty() )
@@ -196,10 +233,6 @@
             {
                 info.candidates.add( Bytes.toLong( qualifier ) );
             }
-            else if ( Bytes.equals( COUNT_QUALIFIER, qualifier ) )
-            {
-                info.count = Bytes.toLong( result.getFamilyMap( INFO_FAMILY ).get( qualifier ) );
-            }
         }
         return info;
     }
@@ -207,23 +240,11 @@
 
     public void add( byte[] value, Long id ) throws Exception
     {
-        // exact match (attribute=value): #value -> count, id
-        // check first if the index already exists because we won't increment the index count
-        byte[] exactCountRow = getCountKey( value );
-        Get exactGet = new Get( exactCountRow );
-        exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
-        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
-        {
-            // get+put+put is not atomic!
-            Put exactPut = new Put( exactCountRow );
-            //exactPut.setWriteToWAL( false );
-            exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) );
-            HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
-
-            // increment exact match count: #value -> count
-            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
-                COUNT_QUALIFIER );
-        }
+        // exact match (attribute=value): =value -> id
+        byte[] equalsKey = getEqualsKey( value );
+        Put exactPut = new Put( equalsKey );
+        exactPut.add( INFO_FAMILY, Bytes.toBytes( id ), Bytes.toBytes( id ) );
+        HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
 
         // TODO: optimize - don't need to clear the ẃhole cache
         infoCache.clear();
@@ -234,22 +255,11 @@
 
     public void drop( byte[] value, Long id ) throws Exception
     {
-        // exact match (attribute=value): #value -> count, id
-        // check first if the index exists because we won't decrement the index count otherwise
-        byte[] exactCountRow = getCountKey( value );
-        Get exactGet = new Get( exactCountRow );
-        exactGet.addColumn( INFO_FAMILY, Bytes.toBytes( id ) );
-        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
-        {
-            Delete exactDel = new Delete( exactCountRow );
-            exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) );
-            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
-
-            // decrement exact match count: #value -> count
-            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
-                COUNT_QUALIFIER );
-            // TODO: delete column if count is 0?
-        }
+        // exact match (attribute=value): =value -> id
+        byte[] equalsKey = getEqualsKey( value );
+        Delete exactDel = new Delete( equalsKey );
+        exactDel.deleteColumn( INFO_FAMILY, Bytes.toBytes( id ) );
+        HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
 
         // TODO: optimize - don't need to clear the ẃhole cache
         infoCache.clear();
@@ -260,7 +270,6 @@
     class Info
     {
         Object value;
-        Long count = 0L;
         List<Long> candidates = new ArrayList<Long>();
     }
 

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTable.java Sun Feb 21 22:59:19 2010
@@ -36,7 +36,6 @@
 
     public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
     public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
-    public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
     public static final byte[] VALUE_SCAN_START = new byte[]
         { 0x00 };
     public static final byte[] VALUE_SCAN_STOP = new byte[]
@@ -66,4 +65,10 @@
 
     public abstract ResultScanner getScanner( Scan scan ) throws Exception;
 
+
+    public abstract void add( byte[] value, Long id ) throws Exception;
+
+
+    public abstract void drop( byte[] value, Long id ) throws Exception;
+
 }
\ No newline at end of file

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseIndexTableBase.java Sun Feb 21 22:59:19 2010
@@ -25,9 +25,7 @@
 import org.apache.directory.shared.ldap.entry.client.ClientBinaryValue;
 import org.apache.directory.shared.ldap.schema.AttributeType;
 import org.apache.directory.shared.ldap.schema.SchemaManager;
-import org.apache.directory.shared.ldap.util.ByteBuffer;
 import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -44,10 +42,10 @@
 
     protected String attributeTypeOid;
     protected String indexTableName;
-    private HTablePool indexTablePool;
+    private HBaseTablePool indexTablePool;
     protected SchemaManager schemaManager;
     protected HBaseConfiguration configuration;
-    protected Cache<Object, Long> countCache;
+    protected Cache<Object, Integer> countCache;
     protected Cache<Object, Boolean> existsCache;
 
 
@@ -59,13 +57,17 @@
         this.configuration = configuration;
         String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
         this.indexTableName = tablePrefix + "index_" + name;
-        this.countCache = new Cache<Object, Long>( cacheSize );
+        this.countCache = new Cache<Object, Integer>( cacheSize );
         this.existsCache = new Cache<Object, Boolean>( cacheSize );
     }
 
 
     public void close() throws Exception
     {
+        if ( indexTablePool != null )
+        {
+            indexTablePool.close();
+        }
     }
 
 
@@ -75,12 +77,6 @@
     }
 
 
-    public abstract void add( byte[] value, Long id ) throws Exception;
-
-
-    public abstract void drop( byte[] value, Long id ) throws Exception;
-
-
     protected byte[] getNormalized( Object value ) throws Exception
     {
         AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
@@ -113,51 +109,15 @@
     }
 
 
-    protected HTablePool getIndexTablePool() throws Exception
+    protected HBaseTablePool getIndexTablePool() throws Exception
     {
         if ( indexTablePool == null )
         {
             // ensure table is created
             HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
-            indexTablePool = new HTablePool( configuration, 16 );
+            indexTablePool = new HBaseTablePool( indexTableName, configuration );
         }
         return indexTablePool;
     }
 
-
-    /**
-     * Gets the count key. 
-     * The key has the following syntax:
-     *   <pre>  #value</pre>
-     * where <code>value</code> is the normalized value.
-     *
-     * @param value the value
-     * @return the count row key for the value
-     * @throws Exception
-     */
-    protected byte[] getCountKey( Object value ) throws Exception
-    {
-        if ( value == null )
-        {
-            return null;
-        }
-
-        ByteBuffer bb = new ByteBuffer();
-
-        bb.append( '#' );
-
-        byte[] normValue = getNormalized( value );
-        bb.append( normValue );
-
-        return bb.copyOfUsedBytes();
-    }
-
-
-    public Object getValueFromCountKey( byte[] row ) throws Exception
-    {
-        byte[] value = Bytes.tail( row, row.length - 1 );
-        AttributeType at = schemaManager.getAttributeTypeRegistry().lookup( attributeTypeOid );
-        return at.getSyntax().isHumanReadable() ? Bytes.toString( value ) : value;
-    }
-
 }
\ No newline at end of file

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseMasterTable.java Sun Feb 21 22:59:19 2010
@@ -42,7 +42,6 @@
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -86,14 +85,16 @@
 
     public static final String MASTER_TABLE = "master";
     public static final String TREE_TABLE = "tree";
+    public static final String MAINTAIN_COUNTERS_PROPERTY = "org.apache.directory.maintain.counters";
 
     private HBaseConfiguration configuration;
+    private boolean maintainCounters;
     private SchemaManager schemaManager;
     private LdapDN suffixDn;
 
-    private HTablePool masterTablePool;
+    private HBaseTablePool masterTablePool;
     private String masterTableName;
-    private HTablePool treeTablePool;
+    private HBaseTablePool treeTablePool;
     private String treeTableName;
     private MasterTreeInfo suffixMti;
 
@@ -121,6 +122,7 @@
         this.schemaManager = store.getSchemaManager();
         this.suffixDn = store.getSuffix();
         this.configuration = store.getConfiguration();
+        this.maintainCounters = configuration.getBoolean( MAINTAIN_COUNTERS_PROPERTY, true );
         this.masterTableName = store.getTablePrefix() + MASTER_TABLE;
         this.treeTableName = store.getTablePrefix() + TREE_TABLE;
         this.suffixMti = new MasterTreeInfo( ROOT_ID, suffixDn.getNormName(), null );
@@ -141,27 +143,40 @@
 
     public void close() throws Exception
     {
+        if ( masterTablePool != null )
+        {
+            masterTablePool.close();
+        }
+        if ( treeTablePool != null )
+        {
+            treeTablePool.close();
+        }
     }
 
 
     public Long add( ServerEntry entry ) throws Exception
     {
+        Long id = addToMaster( entry );
+        addToTree( id, entry );
+        return id;
+    }
+
+
+    public Long addToMaster( ServerEntry entry ) throws Exception
+    {
         Long id = nextId();
         List<Long> parentIds = fetchParentIds( entry.getDn(), false );
         String upRdn;
         String normRdn;
-        MasterTreeInfo treeTableKey;
         if ( entry.getDn().equals( suffixDn ) )
         {
             upRdn = entry.getDn().getName();
             normRdn = entry.getDn().getNormName();
-            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
         }
         else
         {
             upRdn = entry.getDn().getRdn().getUpName();
             normRdn = entry.getDn().getRdn().getNormName();
-            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
         }
 
         // put to master and tree table
@@ -169,12 +184,9 @@
         masterPut.add( TREE_INFO_FAMILY, PARENT_ID_QUALIFIER, Bytes.toBytes( parentIds.get( 0 ) ) );
         masterPut.add( TREE_INFO_FAMILY, UP_RDN_QUALIFIER, Bytes.toBytes( upRdn ) );
         masterPut.add( TREE_INFO_FAMILY, NORM_RDN_QUALIFIER, Bytes.toBytes( normRdn ) );
-        Put treePut = new Put( treeTableKey.treeTableKey );
-        treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
         for ( EntryAttribute attribute : entry )
         {
             String attr = attribute.getUpId();
-            String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
             for ( int i = 0; i < attribute.size(); i++ )
             {
                 Value<?> value = attribute.get( i );
@@ -184,6 +196,42 @@
                 // objectClass1 -> top
                 masterPut.add( UP_ATTRIBUTES_FAMILY, Bytes.add( Bytes.toBytes( attr ), Bytes.toBytes( i ) ), value
                     .getBytes() );
+            }
+        }
+        HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
+
+        return id;
+    }
+
+
+    public Long addToTree( Long id, ServerEntry entry ) throws Exception
+    {
+        List<Long> parentIds = fetchParentIds( entry.getDn(), false );
+        String upRdn;
+        String normRdn;
+        MasterTreeInfo treeTableKey;
+        if ( entry.getDn().equals( suffixDn ) )
+        {
+            upRdn = entry.getDn().getName();
+            normRdn = entry.getDn().getNormName();
+            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+        }
+        else
+        {
+            upRdn = entry.getDn().getRdn().getUpName();
+            normRdn = entry.getDn().getRdn().getNormName();
+            treeTableKey = new MasterTreeInfo( parentIds.get( 0 ), normRdn, upRdn );
+        }
+
+        // put to tree table
+        Put treePut = new Put( treeTableKey.treeTableKey );
+        treePut.add( TREE_INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+        for ( EntryAttribute attribute : entry )
+        {
+            String attrOid = ( ( ServerAttribute ) attribute ).getAttributeType().getOid();
+            for ( int i = 0; i < attribute.size(); i++ )
+            {
+                Value<?> value = attribute.get( i );
 
                 // normAttributes:
                 // 2.5.4.0:inetorgperson -> 0
@@ -205,31 +253,33 @@
                 }
             }
         }
-        HBaseTableHelper.put( getMasterTablePool(), masterTableName, masterPut );
         HBaseTableHelper.put( getTreeTablePool(), treeTableName, treePut );
 
-        // update parent one-level count
-        MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) );
-        if ( parentKey != null )
+        if ( maintainCounters )
         {
-            HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
-                ONE_LEVEL_COUNT_QUALIFIER );
-        }
-
-        // update all parents sub-level count
-        for ( Long parentId : parentIds )
-        {
-            parentKey = fetchMasterTreeInfo( parentId );
+            // update parent one-level count
+            MasterTreeInfo parentKey = fetchMasterTreeInfo( parentIds.get( 0 ) );
             if ( parentKey != null )
             {
                 HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
-                    TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+                    TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER );
             }
-        }
 
-        // clear caches
-        oneLevelCountCache.clear();
-        subLevelCountCache.clear();
+            // update all parents sub-level count
+            for ( Long parentId : parentIds )
+            {
+                parentKey = fetchMasterTreeInfo( parentId );
+                if ( parentKey != null )
+                {
+                    HBaseTableHelper.increment( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+                        TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+                }
+            }
+
+            // clear caches
+            oneLevelCountCache.clear();
+            subLevelCountCache.clear();
+        }
 
         return id;
     }
@@ -247,22 +297,25 @@
         Delete treeDel = new Delete( key.treeTableKey );
         HBaseTableHelper.delete( getTreeTablePool(), treeTableName, treeDel );
 
-        // update parent one-level count
-        Long parentId = key.parentId;
-        if ( parentId > ROOT_ID )
-        {
-            MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
-            HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
-                ONE_LEVEL_COUNT_QUALIFIER );
-        }
-
-        // update sub-level count of all parents
-        while ( parentId > ROOT_ID )
-        {
-            MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
-            HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey, TREE_INFO_FAMILY,
-                SUB_LEVEL_COUNT_QUALIFIER );
-            parentId = parentKey.parentId;
+        if ( maintainCounters )
+        {
+            // update parent one-level count
+            Long parentId = key.parentId;
+            if ( parentId > ROOT_ID )
+            {
+                MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+                HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+                    TREE_INFO_FAMILY, ONE_LEVEL_COUNT_QUALIFIER );
+            }
+
+            // update sub-level count of all parents
+            while ( parentId > ROOT_ID )
+            {
+                MasterTreeInfo parentKey = fetchMasterTreeInfo( parentId );
+                HBaseTableHelper.decrement( getTreeTablePool(), treeTableName, parentKey.treeTableKey,
+                    TREE_INFO_FAMILY, SUB_LEVEL_COUNT_QUALIFIER );
+                parentId = parentKey.parentId;
+            }
         }
 
         // clear caches
@@ -590,32 +643,46 @@
         return fetchId( suffixMti );
     }
 
+    private List<Long> ids = new ArrayList<Long>();
+
 
     private long nextId() throws Exception
     {
-        byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW, TREE_INFO_FAMILY,
-            SEQUENCE_QUALIFIER );
-        return Bytes.toLong( id );
+        if ( ids.isEmpty() )
+        {
+            long amount = 100;
+            byte[] id = HBaseTableHelper.increment( getMasterTablePool(), masterTableName, SEQUENCE_ROW,
+                TREE_INFO_FAMILY, SEQUENCE_QUALIFIER, amount );
+            long upper = Bytes.toLong( id );
+            long lower = upper - amount + 1;
+            for ( long l = lower; l <= upper; l++ )
+            {
+                ids.add( l );
+            }
+        }
+
+        Long id = ids.remove( 0 );
+        return id;
     }
 
 
-    private HTablePool getMasterTablePool() throws Exception
+    private HBaseTablePool getMasterTablePool() throws Exception
     {
         if ( masterTablePool == null )
         {
             HBaseTableHelper.createTable( configuration, masterTableName, TREE_INFO_FAMILY, UP_ATTRIBUTES_FAMILY );
-            masterTablePool = new HTablePool( configuration, 16 );
+            masterTablePool = new HBaseTablePool( masterTableName, configuration );
         }
         return masterTablePool;
     }
 
 
-    private HTablePool getTreeTablePool() throws Exception
+    private HBaseTablePool getTreeTablePool() throws Exception
     {
         if ( treeTablePool == null )
         {
             HBaseTableHelper.createTable( configuration, treeTableName, TREE_INFO_FAMILY, NORM_ATTRIBUTES_FAMILY );
-            treeTablePool = new HTablePool( configuration, 16 );
+            treeTablePool = new HBaseTablePool( treeTableName, configuration );
         }
         return treeTablePool;
     }
@@ -697,4 +764,16 @@
         }
     }
 
+
+    public String getTreeTableName()
+    {
+        return treeTableName;
+    }
+
+
+    public String getMasterTableName()
+    {
+        return masterTableName;
+    }
+
 }

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBasePresenceIndexTable.java Sun Feb 21 22:59:19 2010
@@ -21,13 +21,14 @@
 
 
 import org.apache.directory.server.core.partition.hbase.Cache;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.cursor.HBasePresenceIndexCursor;
 import org.apache.directory.shared.ldap.schema.SchemaManager;
 import org.apache.directory.shared.ldap.util.Base64;
 import org.apache.directory.shared.ldap.util.ByteBuffer;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -48,35 +49,35 @@
     public static final byte[] COUNT_ROW = Bytes.toBytes( "!" );
     public static final byte[] INFO_FAMILY = Bytes.toBytes( "info" );
     public static final byte[] ID_QUALIFIER = Bytes.toBytes( "id" );
-    public static final byte[] COUNT_QUALIFIER = Bytes.toBytes( "count" );
     public static final byte[] VALUE_SCAN_FIRST_ENTRYID = new byte[]
         { 0x00 };
     public static final byte[] VALUE_SCAN_LAST_ENTRYID = new byte[]
         { ( byte ) 0xFF };
 
     protected String indexTableName;
-    private HTablePool indexTablePool;
-    private SchemaManager schemaManager;
-    private HBaseConfiguration configuration;
+    private HBaseTablePool indexTablePool;
+    private HBaseStore store;
     private String attributeTypeOid;
 
-    private Cache<String, Long> countCache;
+    private Cache<String, Integer> countCache;
 
 
-    public HBasePresenceIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix,
-        HBaseConfiguration configuration, int cacheSize ) throws Exception
+    public HBasePresenceIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception
     {
         this.attributeTypeOid = attributeTypeOid;
-        this.schemaManager = schemaManager;
-        this.configuration = configuration;
-        String name = schemaManager.getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
-        this.indexTableName = tablePrefix + "index_" + name;
-        this.countCache = new Cache<String, Long>( cacheSize );
+        this.store = store;
+        String name = store.getSchemaManager().getGlobalOidRegistry().getPrimaryName( attributeTypeOid );
+        this.indexTableName = store.getTablePrefix() + "index_" + name;
+        this.countCache = new Cache<String, Integer>( cacheSize );
     }
 
 
     public void close() throws Exception
     {
+        if ( indexTablePool != null )
+        {
+            indexTablePool.close();
+        }
     }
 
 
@@ -121,10 +122,23 @@
             return countCache.get( attributeTypeOid ).intValue();
         }
 
-        Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY,
-            COUNT_QUALIFIER, 0L );
+        // TODO: scan directly instead of using the cursor?
+        HBasePresenceIndexCursor cursor = new HBasePresenceIndexCursor( this );
+        int count = 0;
+        int limit = 100;
+        while ( cursor.next() && count <= limit )
+        {
+            count++;
+        }
+        if ( count >= limit )
+        {
+            // this is just a guess to avoid subtree scan
+            count = store.count() / 10;
+        }
+        cursor.close();
+
         countCache.put( attributeTypeOid, count );
-        return count.intValue();
+        return count;
     }
 
 
@@ -153,20 +167,10 @@
     public void add( Long entryId ) throws Exception
     {
         // presence (attribute=*): *<id> -> id
-        // check first if the index already exists because we won't increment the index count
         byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) );
-        Get presenceGet = new Get( presenceRow );
-        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
-        {
-            // get+put+put is not atomic!
-            Put presencePut = new Put( presenceRow );
-            presencePut.setWriteToWAL( false );
-            presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) );
-            HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut );
-
-            // increment existence count: attribute: -> count
-            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
-        }
+        Put presencePut = new Put( presenceRow );
+        presencePut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( entryId ) );
+        HBaseTableHelper.put( getIndexTablePool(), indexTableName, presencePut );
 
         countCache.clear();
     }
@@ -175,29 +179,21 @@
     public void drop( Long entryId ) throws Exception
     {
         // presence (attribute=*): *<id> -> id
-        // check first if the index exists because we won't decrement the index count otherwise
         byte[] presenceRow = getPresenceKey( Bytes.toBytes( entryId ) );
-        Get presenceGet = new Get( presenceRow );
-        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, presenceGet ) )
-        {
-            Delete presenceDel = new Delete( presenceRow );
-            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel );
-
-            // decrement existence count: attribute: -> count
-            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, COUNT_ROW, INFO_FAMILY, COUNT_QUALIFIER );
-        }
+        Delete presenceDel = new Delete( presenceRow );
+        HBaseTableHelper.delete( getIndexTablePool(), indexTableName, presenceDel );
 
         countCache.clear();
     }
 
 
-    protected HTablePool getIndexTablePool() throws Exception
+    protected HBaseTablePool getIndexTablePool() throws Exception
     {
         if ( indexTablePool == null )
         {
             // ensure table is created
-            HBaseTableHelper.createTable( configuration, indexTableName, INFO_FAMILY );
-            indexTablePool = new HTablePool( configuration, 16 );
+            HBaseTableHelper.createTable( store.getConfiguration(), indexTableName, INFO_FAMILY );
+            indexTablePool = new HBaseTablePool( indexTableName, store.getConfiguration() );
         }
         return indexTablePool;
     }

Modified: directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java
URL: http://svn.apache.org/viewvc/directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java?rev=912434&r1=912433&r2=912434&view=diff
==============================================================================
--- directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java (original)
+++ directory/sandbox/seelmann/hbase-partition/src/main/java/org/apache/directory/server/core/partition/hbase/table/HBaseRowIndexTable.java Sun Feb 21 22:59:19 2010
@@ -20,11 +20,13 @@
 package org.apache.directory.server.core.partition.hbase.table;
 
 
+import org.apache.directory.server.core.entry.ServerEntry;
+import org.apache.directory.server.core.partition.hbase.HBaseStore;
+import org.apache.directory.server.core.partition.hbase.index.HBaseUserIndex;
+import org.apache.directory.server.xdbm.IndexCursor;
 import org.apache.directory.shared.ldap.schema.AttributeType;
-import org.apache.directory.shared.ldap.schema.SchemaManager;
 import org.apache.directory.shared.ldap.util.Base64;
 import org.apache.directory.shared.ldap.util.ByteBuffer;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
@@ -45,12 +47,13 @@
 public class HBaseRowIndexTable extends HBaseIndexTableBase
 {
     private static final Logger LOG = LoggerFactory.getLogger( HBaseRowIndexTable.class );
+    private HBaseStore store;
 
 
-    public HBaseRowIndexTable( String attributeTypeOid, SchemaManager schemaManager, String tablePrefix,
-        HBaseConfiguration configuration, int cacheSize ) throws Exception
+    public HBaseRowIndexTable( String attributeTypeOid, HBaseStore store, int cacheSize ) throws Exception
     {
-        super( attributeTypeOid, schemaManager, tablePrefix, configuration, cacheSize );
+        super( attributeTypeOid, store.getSchemaManager(), store.getTablePrefix(), store.getConfiguration(), cacheSize );
+        this.store = store;
     }
 
 
@@ -146,15 +149,24 @@
             return countCache.get( value ).intValue();
         }
 
-        byte[] row = getCountKey( value );
-        if ( row == null )
+        // TODO: scan directly instead of using the cursor?
+        HBaseUserIndex<HBaseIndexTable> index = store.getUserIndex( attributeTypeOid );
+        IndexCursor<Object, ServerEntry> cursor = index.forwardCursor( value );
+        int count = 0;
+        int limit = 100;
+        while ( cursor.next() && count <= limit )
+        {
+            count++;
+        }
+        if ( count >= limit )
         {
-            return 0;
+            // this is just a guess to avoid subtree scan
+            count = store.count() / 10;
         }
-        Long count = HBaseTableHelper.getLongValue( getIndexTablePool(), indexTableName, row, INFO_FAMILY,
-            COUNT_QUALIFIER, 0L );
+        cursor.close();
+
         countCache.put( value, count );
-        return count.intValue();
+        return count;
     }
 
 
@@ -181,22 +193,10 @@
     public void add( byte[] value, Long id ) throws Exception
     {
         // exact match (attribute=value): =value<id> -> id, value
-        // check first if the index already exists because we won't increment the index count
         byte[] exactRow = getEqualsKey( value, id );
-        Get exactGet = new Get( exactRow );
-        if ( !HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
-        {
-            // get+put+put is not atomic!
-            Put exactPut = new Put( exactRow );
-            //exactPut.setWriteToWAL( false );
-            exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
-            HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
-
-            // increment exact match count: #value -> count
-            byte[] exactCountRow = getCountKey( value );
-            HBaseTableHelper.increment( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
-                COUNT_QUALIFIER );
-        }
+        Put exactPut = new Put( exactRow );
+        exactPut.add( INFO_FAMILY, ID_QUALIFIER, Bytes.toBytes( id ) );
+        HBaseTableHelper.put( getIndexTablePool(), indexTableName, exactPut );
 
         // TODO: optimize - don't need to clear the ẃhole cache
         countCache.clear();
@@ -207,20 +207,9 @@
     public void drop( byte[] value, Long id ) throws Exception
     {
         // exact match (attribute=value): =value<id> -> id
-        // check first if the index exists because we won't decrement the index count otherwise
         byte[] exactRow = getEqualsKey( value, id );
-        Get exactGet = new Get( exactRow );
-        if ( HBaseTableHelper.exists( getIndexTablePool(), indexTableName, exactGet ) )
-        {
-            Delete exactDel = new Delete( exactRow );
-            HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
-
-            // decrement exact match count: #value -> count
-            byte[] exactCountRow = getCountKey( value );
-            HBaseTableHelper.decrement( getIndexTablePool(), indexTableName, exactCountRow, INFO_FAMILY,
-                COUNT_QUALIFIER );
-            // TODO: delete column if count is 0?
-        }
+        Delete exactDel = new Delete( exactRow );
+        HBaseTableHelper.delete( getIndexTablePool(), indexTableName, exactDel );
 
         // TODO: optimize - don't need to clear the ẃhole cache
         countCache.clear();