You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2009/02/22 23:28:13 UTC

svn commit: r746838 - in /jackrabbit/sandbox/jackrabbit-hadoop: ./ src/main/java/org/apache/jackrabbit/hadoop/ src/main/java/org/apache/jackrabbit/hadoop/journal/

Author: mreutegg
Date: Sun Feb 22 22:28:11 2009
New Revision: 746838

URL: http://svn.apache.org/viewvc?rev=746838&view=rev
Log:
Journal implementation on top of HBase
-> requires patched HBase 0.19.0!

Added:
    jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java   (with props)
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java   (with props)
Modified:
    jackrabbit/sandbox/jackrabbit-hadoop/README.txt
    jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
    jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java

Modified: jackrabbit/sandbox/jackrabbit-hadoop/README.txt
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/README.txt?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/README.txt (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/README.txt Sun Feb 22 22:28:11 2009
@@ -1,11 +1,14 @@
 This module provides an implementation of a PersistenceManager
-on top of Hadoop HBase and a DataStore on Hadoop HDFS.
+and a Journal on top of Hadoop HBase and a DataStore on Hadoop HDFS.
+
+The Journal implementation requires a patch to hbase-0.19.0, otherwise
+RowLocks are not used when a transaction in HBase is committed.
+No issue filed yet, but I'll do that soon.
 
 Building this module requires JAVA 6 and you need to manually
 deploy the two dependencies hbase-0.19.0.jar and hadoop-core-0.19.0.jar.
 Simply run 'mvn install' and you will get instructions on how
 this can be done.
 
-The default configuration for both the PersistenceManager as
-well as the DataStore assume that Hadoop is running on
+The default configuration for modules assume that Hadoop is running on
 hdfs://localhost:9000

Added: jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch Sun Feb 22 22:28:11 2009
@@ -0,0 +1,90 @@
+Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java	(revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java	(working copy)
+@@ -1854,7 +1854,7 @@
+    * @return intId Integer row lock used internally in HRegion
+    * @throws IOException Thrown if this is not a valid client lock id.
+    */
+-  private Integer getLockFromId(long lockId)
++  protected Integer getLockFromId(long lockId)
+   throws IOException {
+     if(lockId == -1L) {
+       return null;
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java	(revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java	(working copy)
+@@ -348,10 +348,10 @@
+    * @param b
+    * @throws IOException
+    */
+-  public void batchUpdate(final long transactionId, final BatchUpdate b)
++  public void batchUpdate(final long transactionId, final BatchUpdate b, Integer lockId)
+       throws IOException {
+     TransactionState state = getTransactionState(transactionId);
+-    state.addWrite(b);
++    state.addWrite(b, lockId);
+     logManager.writeUpdateToLog(transactionId, b);
+   }
+ 
+@@ -500,8 +500,10 @@
+       logManager.writeCommitToLog(state.getTransactionId());
+     }
+ 
++    List<Integer> lockIds = state.getReadLockSet();
++    int i = 0;
+     for (BatchUpdate update : state.getWriteSet()) {
+-      this.batchUpdate(update, false); // Don't need to WAL these
++      this.batchUpdate(update, lockIds.get(i++), false); // Don't need to WAL these
+       // FIME, maybe should be walled so we don't need to look so far back.
+     }
+ 
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java	(revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java	(working copy)
+@@ -143,7 +143,7 @@
+     checkOpen();
+     super.getRequestCount().incrementAndGet();
+     try {
+-      getTransactionalRegion(regionName).batchUpdate(transactionId, b);
++      getTransactionalRegion(regionName).batchUpdate(transactionId, b, getLockFromId(b.getRowLock()));
+     } catch (IOException e) {
+       checkFileSystem();
+       throw e;
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java	(revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java	(working copy)
+@@ -64,6 +64,7 @@
+   private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
+       Bytes.BYTES_COMPARATOR);
+   private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
++  private List<Integer> readLockSet = new LinkedList<Integer>();
+   private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
+   private int startSequenceNumber;
+   private Integer sequenceNumber;
+@@ -88,13 +89,22 @@
+   }
+ 
+   public void addWrite(final BatchUpdate write) {
++    addWrite(write, null);
++  }
++
++  public void addWrite(final BatchUpdate write, final Integer lockId) {
+     writeSet.add(write);
++    readLockSet.add(lockId);
+   }
+ 
+   public List<BatchUpdate> getWriteSet() {
+     return writeSet;
+   }
+ 
++  public List<Integer> getReadLockSet() {
++    return readLockSet;
++  }
++
+   /**
+    * GetFull from the writeSet.
+    * 

Modified: jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/pom.xml?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/pom.xml (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/pom.xml Sun Feb 22 22:28:11 2009
@@ -21,74 +21,76 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                              http://maven.apache.org/maven-v4_0_0.xsd">
-  <modelVersion>4.0.0</modelVersion>
+    <modelVersion>4.0.0</modelVersion>
 
-<!-- ====================================================================== -->
-<!-- P R O J E C T  D E S C R I P T I O N                                   -->
-<!-- ====================================================================== -->
-  <groupId>org.apache.jackrabbit</groupId>
-  <artifactId>jackrabbit-hadoop</artifactId>
-  <version>1.6-SNAPSHOT</version>
-  <name>Jackrabbit on Hadoop and HBase</name>
-  <description>Jackrabbit PersistenceManager on HBase and DataStore on Hadoop</description>
+    <!-- ====================================================================== -->
+    <!-- P R O J E C T  D E S C R I P T I O N                                   -->
+    <!-- ====================================================================== -->
+    <groupId>org.apache.jackrabbit</groupId>
+    <artifactId>jackrabbit-hadoop</artifactId>
+    <version>1.6-SNAPSHOT</version>
+    <name>Jackrabbit on Hadoop and HBase</name>
+    <description>Jackrabbit PersistenceManager on HBase and DataStore on
+        Hadoop
+    </description>
 
-  <dependencies>
-    <dependency>
-      <groupId>javax.jcr</groupId>
-      <artifactId>jcr</artifactId>
-      <version>1.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.jackrabbit</groupId>
-      <artifactId>jackrabbit-core</artifactId>
-      <version>1.6-SNAPSHOT</version>
-      <exclusions>
-        <exclusion>
-          <groupId>commons-logging</groupId>
-          <artifactId>commons-logging</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-log4j12</artifactId>
-      <version>1.5.3</version>
-    </dependency>
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>jcl-over-slf4j</artifactId>
-      <version>1.5.3</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hbase</artifactId>
-      <version>0.19.0</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
-      <version>0.19.0</version>
-    </dependency>
-  </dependencies>
-  
-  <build>
-    <plugins>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <configuration>
-          <target>1.6</target>
-          <source>1.6</source>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-idea-plugin</artifactId>
-        <version>2.0</version>
-        <configuration>
-          <downloadSources>true</downloadSources>
-          <jdkLevel>1.6</jdkLevel>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
+    <dependencies>
+        <dependency>
+            <groupId>javax.jcr</groupId>
+            <artifactId>jcr</artifactId>
+            <version>1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>jackrabbit-core</artifactId>
+            <version>1.6-SNAPSHOT</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.5.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jcl-over-slf4j</artifactId>
+            <version>1.5.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hbase</artifactId>
+            <version>0.19.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+            <version>0.19.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <target>1.6</target>
+                    <source>1.6</source>
+                </configuration>
+            </plugin>
+            <plugin>
+                <artifactId>maven-idea-plugin</artifactId>
+                <version>2.0</version>
+                <configuration>
+                    <downloadSources>true</downloadSources>
+                    <jdkLevel>1.6</jdkLevel>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
 
 </project>

Modified: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java Sun Feb 22 22:28:11 2009
@@ -36,6 +36,7 @@
 import org.apache.jackrabbit.core.persistence.util.Serializer;
 import org.apache.jackrabbit.core.NodeId;
 import org.apache.jackrabbit.core.NodeIdIterator;
+import org.apache.jackrabbit.core.util.StringIndex;
 import org.apache.jackrabbit.core.state.ItemStateException;
 import org.apache.jackrabbit.core.state.NodeReferences;
 import org.apache.jackrabbit.core.state.NodeReferencesId;
@@ -63,7 +64,11 @@
 
     private String tablePrefix = null;
 
-    private static final byte[] DATA_COLUMN = Bytes.toBytes("data:");
+    static final byte[] DATA_COLUMN = Bytes.toBytes("data:");
+
+    static final byte[] NAME_COLUMN = Bytes.toBytes("name:");
+
+    private StringIndex nameIndex;
 
     private HBaseConfiguration config = new HBaseConfiguration();
 
@@ -71,6 +76,8 @@
 
     private TransactionalTable noderefs;
 
+    private TransactionalTable nameIdx;
+
     private BundleBinding binding;
 
     private TransactionManager txMgr;
@@ -91,6 +98,7 @@
         // set table names
         byte[] bundlesTable = Bytes.toBytes(tablePrefix + "bundles");
         byte[] nodeRefsTable = Bytes.toBytes(tablePrefix + "noderefs");
+        byte[] namesTable = Bytes.toBytes(tablePrefix + "names");
 
         HBaseAdmin admin = new HBaseAdmin(config);
         if (!admin.tableExists(bundlesTable)) {
@@ -105,10 +113,19 @@
             admin.createTable(noderefs);
             admin.enableTable(nodeRefsTable);
         }
+        if (!admin.tableExists(namesTable)) {
+            HTableDescriptor descriptor = new HTableDescriptor(namesTable);
+            descriptor.addFamily(new HColumnDescriptor(NAME_COLUMN));
+            admin.createTable(descriptor);
+            admin.enableTable(namesTable);
+        }
         txMgr = new TransactionManager(config);
 
         bundles = new TransactionalTable(config, bundlesTable);
         noderefs = new TransactionalTable(config, nodeRefsTable);
+        nameIdx = new TransactionalTable(config, namesTable);
+
+        nameIndex = new HBaseStringIndex(nameIdx);
 
         binding = new BundleBinding(new ErrorHandling(ErrorHandling.IGNORE_MISSING_BLOBS),
                 null, getNsIndex(), getNameIndex(), context.getDataStore());
@@ -236,6 +253,11 @@
         } catch (IOException e) {
             // TODO: log
         }
+        try {
+            nameIdx.close();
+        } catch (IOException e) {
+            // TODO: log
+        }
     }
 
     public boolean exists(NodeReferencesId id) throws ItemStateException {
@@ -293,6 +315,10 @@
         }
     }
 
+    public StringIndex getNameIndex() {
+        return nameIndex;
+    }
+
     //---------------------------< parameters >---------------------------------
 
     public String getRootdir() {

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.util.StringIndex;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * <code>HBaseStringIndex</code>...
+ */
+public class HBaseStringIndex implements StringIndex {
+
+    private static final byte[] LOCK_ROW = new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff};
+
+    private final TransactionalTable table;
+
+    // caches
+    private final Map<String, Integer> string2Index = new HashMap<String, Integer>();
+    private final Map<Integer, String> index2String = new HashMap<Integer, String>();
+
+    public HBaseStringIndex(TransactionalTable table) throws IOException {
+        this.table = table;
+        // make sure lock row exists
+        checkLockRow();
+        // read names
+        readNames(0);
+    }
+
+    public int stringToIndex(String string) throws IllegalArgumentException {
+        Integer index = string2Index.get(string);
+        if (index != null) {
+            return index;
+        }
+        try {
+            return addIfNotFound(string);
+        } catch (IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    public String indexToString(int idx) throws IllegalArgumentException {
+        if (idx < 0) {
+            throw new IllegalArgumentException("idx < 0");
+        }
+        String string = index2String.get(idx);
+        if (string != null) {
+            return string;
+        }
+        throw new IllegalArgumentException("Unknown index: " + idx);
+    }
+
+    protected void checkLockRow() throws IOException {
+        Cell cell = table.get(LOCK_ROW, HBasePersistenceManager.NAME_COLUMN);
+        if (cell == null) {
+            BatchUpdate update = new BatchUpdate(LOCK_ROW);
+            update.put(HBasePersistenceManager.NAME_COLUMN, Bytes.toBytes("lock"));
+            table.commit(update);
+            table.flushCommits();
+        }
+    }
+
+    protected int addIfNotFound(String string) throws IOException {
+        RowLock lock = lock();
+        try {
+            // make sure we are up-to-date
+            readNames(string2Index.size());
+            // check if string is present
+            Integer index = string2Index.get(string);
+            if (index == null) {
+                // add it
+                index = string2Index.size();
+                BatchUpdate update = new BatchUpdate(Bytes.toBytes(index));
+                update.put(HBasePersistenceManager.NAME_COLUMN, Bytes.toBytes(string));
+                table.commit(update);
+                table.flushCommits();
+                string2Index.put(string, index);
+                index2String.put(index, string);
+            }
+            return index;
+        } finally {
+            unlock(lock);
+        }
+    }
+
+    protected Scanner getScanner(int start) throws IOException {
+        return table.getScanner(new byte[][]{HBasePersistenceManager.NAME_COLUMN}, Bytes.toBytes(start), LOCK_ROW);
+    }
+
+    protected RowLock lock() throws IOException {
+        return table.lockRow(LOCK_ROW);
+    }
+
+    protected void unlock(RowLock lock) throws IOException {
+        table.unlockRow(lock);
+    }
+
+    protected void readNames(int start) throws IOException {
+        Scanner s = getScanner(start);
+        try {
+            for (RowResult row : s) {
+                int idx = Bytes.toInt(row.getRow());
+                String string = Bytes.toString(row.get(HBasePersistenceManager.NAME_COLUMN).getValue());
+                string2Index.put(string, idx);
+                index2String.put(idx, string);
+            }
+        } finally {
+            s.close();
+        }
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.journal.AbstractJournal;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.AppendRecord;
+import org.apache.jackrabbit.core.journal.InstanceRevision;
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.RowLock;
+
+/**
+ * <code>HBaseJournal</code>...
+ */
+public class HBaseJournal extends AbstractJournal {
+
+    static final byte[] JOURNAL_FAMILY = Bytes.toBytes("journal:");
+
+    static final byte[] JOURNAL_ID = Bytes.toBytes("journal:journal_id");
+
+    static final byte[] PRODUCER_ID = Bytes.toBytes("journal:producer_id");
+
+    static final byte[] REVISION_ID = Bytes.toBytes("journal:revision_id");
+
+    static final byte[] REVISION_DATA = Bytes.toBytes("journal:revision_data");
+
+    static final byte[] GLOBAL_REVISION_ROW = Bytes.toBytes(0);
+
+    private HBaseConfiguration config = new HBaseConfiguration();
+
+    private TransactionalTable journal;
+
+    private TransactionalTable localRevisions;
+
+    private TransactionalTable globalRevision;
+
+    private TransactionManager txMgr;
+
+    private TransactionState txState;
+
+    private InstanceRevision instanceRevision;
+
+    private RowLock globalRevisionLock;
+
+    /**
+     * Locked revision.
+     */
+    private long lockedRevision;
+
+    /**
+     * Auto commit level.
+     */
+    private int lockLevel;
+
+    public void init(String id, NamespaceResolver resolver)
+            throws JournalException {
+        super.init(id, resolver);
+        config.set("hbase.regionserver.class", "org.apache.hadoop.hbase.ipc.TransactionalRegionInterface");
+        config.set("hbase.regionserver.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer");
+
+        txMgr = new TransactionManager(config);
+        try {
+            initializeTables();
+            instanceRevision = new HBaseRevision(localRevisions, id);
+        } catch (Exception e) {
+            closeTables();
+            throw new JournalException(e.getMessage(), e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * Save away the locked revision inside the newly appended record.
+     */
+    protected void appending(AppendRecord record) {
+        record.setRevision(lockedRevision);
+    }
+
+    protected void append(AppendRecord record, InputStream in, int length)
+            throws JournalException {
+        try {
+            BatchUpdate update = new BatchUpdate(Bytes.toBytes(record.getRevision()));
+            update.put(JOURNAL_ID, Bytes.toBytes(getId()));
+            update.put(PRODUCER_ID, Bytes.toBytes(record.getProducerId()));
+            byte[] data = new byte[length];
+            int remaining = length;
+            while (remaining > 0) {
+                int read = in.read(data, length - remaining, remaining);
+                if (read == -1) {
+                    break;
+                } else {
+                    remaining -= read;
+                }
+            }
+            update.put(REVISION_DATA, data);
+            journal.commit(txState, update);
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        }
+    }
+
+    public InstanceRevision getInstanceRevision() throws JournalException {
+        return instanceRevision;
+    }
+
+    public RecordIterator getRecords(long startRevision)
+            throws JournalException {
+        Scanner scanner;
+        try {
+            byte[][] columns = new byte[][]{JOURNAL_ID, PRODUCER_ID, REVISION_DATA};
+            scanner = journal.getScanner(columns, Bytes.toBytes(startRevision + 1));
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        }
+        try {
+            return new HBaseRecordIterator(scanner, getResolver(), getNamePathResolver());
+        } catch (IOException e) {
+            scanner.close();
+            throw new JournalException(e.getMessage(), e);
+        }
+    }
+
+    public RecordIterator getRecords() throws JournalException {
+        return getRecords(0);
+    }
+
+    protected void doLock() throws JournalException {
+        boolean succeeded = false;
+        try {
+            if (lockLevel++ == 0) {
+                txState = txMgr.beginTransaction();
+                globalRevisionLock = globalRevision.lockRow(GLOBAL_REVISION_ROW);
+            }
+
+            Cell cell = globalRevision.getRow(GLOBAL_REVISION_ROW,
+                    new byte[][]{REVISION_ID}, HConstants.LATEST_TIMESTAMP,
+                    1, globalRevisionLock).get(REVISION_ID);
+            long gr = Bytes.toLong(cell.getValue()) + 1;
+            BatchUpdate update = new BatchUpdate(GLOBAL_REVISION_ROW);
+            update.put(REVISION_ID, Bytes.toBytes(gr));
+            update.setRowLock(globalRevisionLock.getLockId());
+            globalRevision.commit(txState, update);
+            lockedRevision = gr;
+
+            succeeded = true;
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        } finally {
+            if (!succeeded) {
+                doUnlock(false);
+            }
+        }
+    }
+
+    protected void doUnlock(boolean successful) {
+        if (--lockLevel == 0) {
+            if (txState != null) {
+                try {
+                    if (successful) {
+                        txMgr.tryCommit(txState);
+                    } else {
+                        txMgr.abort(txState);
+                    }
+                } catch (IOException e) {
+                    // TODO: log
+                } catch (CommitUnsuccessfulException e) {
+                    // TODO: log
+                } finally {
+                    txState = null;
+                    if (globalRevisionLock != null) {
+                        try {
+                            globalRevision.unlockRow(globalRevisionLock);
+                        } catch (IOException e) {
+                            // TODO: log
+                        } finally {
+                            globalRevisionLock = null;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void close() {
+        closeTables();
+    }
+
+    //--------------------------< parameters >----------------------------------
+
+    public String getRootdir() {
+        return config.get("hbase.rootdir");
+    }
+
+    public void setRootdir(String rootdir) {
+        config.set("hbase.rootdir", rootdir);
+    }
+
+    protected void initializeTables() throws IOException {
+        HBaseAdmin admin = new HBaseAdmin(config);
+        byte[] journalTable = Bytes.toBytes("journal");
+        if (!admin.tableExists(journalTable)) {
+            HTableDescriptor descriptor = new HTableDescriptor(journalTable);
+            descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+            admin.createTable(descriptor);
+            admin.enableTable(journalTable);
+        }
+        journal = new TransactionalTable(config, journalTable);
+        byte[] localRevisionsTable = Bytes.toBytes("local_revisions");
+        if (!admin.tableExists(localRevisionsTable)) {
+            HTableDescriptor descriptor = new HTableDescriptor(localRevisionsTable);
+            descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+            admin.createTable(descriptor);
+            admin.enableTable(localRevisionsTable);
+        }
+        localRevisions = new TransactionalTable(config, localRevisionsTable);
+        byte[] globalRevisionTable = Bytes.toBytes("global_revision");
+        if (!admin.tableExists(globalRevisionTable)) {
+            HTableDescriptor descriptor = new HTableDescriptor(globalRevisionTable);
+            descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+            admin.createTable(descriptor);
+            admin.enableTable(globalRevisionTable);
+        }
+        globalRevision = new TransactionalTable(config, globalRevisionTable);
+
+        // make sure initial revision exists
+        if (globalRevision.get(GLOBAL_REVISION_ROW, REVISION_ID) == null) {
+            BatchUpdate update = new BatchUpdate(GLOBAL_REVISION_ROW);
+            update.put(REVISION_ID, new byte[0]);
+            globalRevision.commit(update);
+            globalRevision.flushCommits();
+        }
+    }
+
+    protected void closeTables() {
+        closeTable(journal);
+        closeTable(localRevisions);
+        closeTable(globalRevision);
+    }
+
+    protected static void closeTable(HTable table) {
+        try {
+            if (table != null) {
+                table.close();
+            }
+        } catch (IOException e) {
+            // TODO: log
+        }
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.util.NoSuchElementException;
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.ReadRecord;
+import org.apache.jackrabbit.spi.commons.conversion.NamePathResolver;
+import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * <code>HBaseRecordIterator</code>...
+ */
+public class HBaseRecordIterator implements RecordIterator {
+
+    private final Scanner scanner;
+
+    private final NamespaceResolver resolver;
+
+    private final NamePathResolver npResolver;
+
+    private Record next;
+
+    public HBaseRecordIterator(Scanner scanner,
+                               NamespaceResolver resolver,
+                               NamePathResolver npResolver) throws IOException {
+        this.scanner = scanner;
+        this.resolver = resolver;
+        this.npResolver = npResolver;
+        fetchNext();
+    }
+
+    public boolean hasNext() {
+        return next != null;
+    }
+
+    public Record nextRecord() throws NoSuchElementException, JournalException {
+        if (next == null) {
+            throw new NoSuchElementException();
+        }
+        Record rec = next;
+        try {
+            fetchNext();
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        }
+        return rec;
+    }
+
+    public void close() {
+        scanner.close();
+    }
+
+    protected void fetchNext() throws IOException {
+        next = null;
+        RowResult row = scanner.next();
+        if (row != null) {
+            String journalId = Bytes.toString(row.get(HBaseJournal.JOURNAL_ID).getValue());
+            String producerId = Bytes.toString(row.get(HBaseJournal.PRODUCER_ID).getValue());
+            long revision = Bytes.toLong(row.getRow());
+            byte[] revisionData = row.get(HBaseJournal.REVISION_DATA).getValue();
+            DataInputStream in = new DataInputStream(new ByteArrayInputStream(revisionData));
+            next = new ReadRecord(journalId, producerId, revision,
+                    in, revisionData.length, resolver, npResolver);
+        }
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.journal.InstanceRevision;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+
+/**
+ * <code>HBaseRevision</code>...
+ */
+public class HBaseRevision implements InstanceRevision {
+
+    private final HTable localRevisions;
+
+    private final byte[] journalId;
+
+    private long revision;
+
+    public HBaseRevision(HTable localRevisions, String journalId)
+            throws JournalException {
+        this.localRevisions = localRevisions;
+        this.journalId = Bytes.toBytes(journalId);
+        try {
+            Cell cell = localRevisions.get(this.journalId, HBaseJournal.REVISION_ID);
+            if (cell == null) {
+                set(0);
+            } else {
+                revision = Bytes.toLong(cell.getValue());
+            }
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        }
+    }
+
+    public synchronized long get() throws JournalException {
+        return revision;
+    }
+
+    public synchronized void set(long value) throws JournalException {
+        BatchUpdate update = new BatchUpdate(journalId);
+        update.put(HBaseJournal.REVISION_ID, Bytes.toBytes(value));
+        try {
+            localRevisions.commit(update);
+        } catch (IOException e) {
+            throw new JournalException(e.getMessage(), e);
+        }
+        revision = value;
+    }
+
+    public void close() {
+    }
+}

Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java
------------------------------------------------------------------------------
    svn:eol-style = native