You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@jackrabbit.apache.org by mr...@apache.org on 2009/02/22 23:28:13 UTC
svn commit: r746838 - in /jackrabbit/sandbox/jackrabbit-hadoop: ./
src/main/java/org/apache/jackrabbit/hadoop/
src/main/java/org/apache/jackrabbit/hadoop/journal/
Author: mreutegg
Date: Sun Feb 22 22:28:11 2009
New Revision: 746838
URL: http://svn.apache.org/viewvc?rev=746838&view=rev
Log:
Journal implementation on top of HBase
-> requires patched HBase 0.19.0!
Added:
jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java (with props)
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java (with props)
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java (with props)
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java (with props)
Modified:
jackrabbit/sandbox/jackrabbit-hadoop/README.txt
jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
Modified: jackrabbit/sandbox/jackrabbit-hadoop/README.txt
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/README.txt?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/README.txt (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/README.txt Sun Feb 22 22:28:11 2009
@@ -1,11 +1,14 @@
This module provides an implementation of a PersistenceManager
-on top of Hadoop HBase and a DataStore on Hadoop HDFS.
+and a Journal on top of Hadoop HBase and a DataStore on Hadoop HDFS.
+
+The Journal implementation requires a patch to hbase-0.19.0, otherwise
+RowLocks are not used when a transaction in HBase is committed.
+No issue filed yet, but I'll do that soon.
Building this module requires JAVA 6 and you need to manually
deploy the two dependencies hbase-0.19.0.jar and hadoop-core-0.19.0.jar.
Simply run 'mvn install' and you will get instructions on how
this can be done.
-The default configuration for both the PersistenceManager as
-well as the DataStore assume that Hadoop is running on
+The default configuration for modules assume that Hadoop is running on
hdfs://localhost:9000
Added: jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/hbase-0.19.0.patch Sun Feb 22 22:28:11 2009
@@ -0,0 +1,90 @@
+Index: src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (working copy)
+@@ -1854,7 +1854,7 @@
+ * @return intId Integer row lock used internally in HRegion
+ * @throws IOException Thrown if this is not a valid client lock id.
+ */
+- private Integer getLockFromId(long lockId)
++ protected Integer getLockFromId(long lockId)
+ throws IOException {
+ if(lockId == -1L) {
+ return null;
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java (working copy)
+@@ -348,10 +348,10 @@
+ * @param b
+ * @throws IOException
+ */
+- public void batchUpdate(final long transactionId, final BatchUpdate b)
++ public void batchUpdate(final long transactionId, final BatchUpdate b, Integer lockId)
+ throws IOException {
+ TransactionState state = getTransactionState(transactionId);
+- state.addWrite(b);
++ state.addWrite(b, lockId);
+ logManager.writeUpdateToLog(transactionId, b);
+ }
+
+@@ -500,8 +500,10 @@
+ logManager.writeCommitToLog(state.getTransactionId());
+ }
+
++ List<Integer> lockIds = state.getReadLockSet();
++ int i = 0;
+ for (BatchUpdate update : state.getWriteSet()) {
+- this.batchUpdate(update, false); // Don't need to WAL these
++ this.batchUpdate(update, lockIds.get(i++), false); // Don't need to WAL these
+ // FIME, maybe should be walled so we don't need to look so far back.
+ }
+
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java (working copy)
+@@ -143,7 +143,7 @@
+ checkOpen();
+ super.getRequestCount().incrementAndGet();
+ try {
+- getTransactionalRegion(regionName).batchUpdate(transactionId, b);
++ getTransactionalRegion(regionName).batchUpdate(transactionId, b, getLockFromId(b.getRowLock()));
+ } catch (IOException e) {
+ checkFileSystem();
+ throw e;
+Index: src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java
+===================================================================
+--- src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (revision 746835)
++++ src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java (working copy)
+@@ -64,6 +64,7 @@
+ private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
+ Bytes.BYTES_COMPARATOR);
+ private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
++ private List<Integer> readLockSet = new LinkedList<Integer>();
+ private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
+ private int startSequenceNumber;
+ private Integer sequenceNumber;
+@@ -88,13 +89,22 @@
+ }
+
+ public void addWrite(final BatchUpdate write) {
++ addWrite(write, null);
++ }
++
++ public void addWrite(final BatchUpdate write, final Integer lockId) {
+ writeSet.add(write);
++ readLockSet.add(lockId);
+ }
+
+ public List<BatchUpdate> getWriteSet() {
+ return writeSet;
+ }
+
++ public List<Integer> getReadLockSet() {
++ return readLockSet;
++ }
++
+ /**
+ * GetFull from the writeSet.
+ *
Modified: jackrabbit/sandbox/jackrabbit-hadoop/pom.xml
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/pom.xml?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/pom.xml (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/pom.xml Sun Feb 22 22:28:11 2009
@@ -21,74 +21,76 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ <modelVersion>4.0.0</modelVersion>
-<!-- ====================================================================== -->
-<!-- P R O J E C T D E S C R I P T I O N -->
-<!-- ====================================================================== -->
- <groupId>org.apache.jackrabbit</groupId>
- <artifactId>jackrabbit-hadoop</artifactId>
- <version>1.6-SNAPSHOT</version>
- <name>Jackrabbit on Hadoop and HBase</name>
- <description>Jackrabbit PersistenceManager on HBase and DataStore on Hadoop</description>
+ <!-- ====================================================================== -->
+ <!-- P R O J E C T D E S C R I P T I O N -->
+ <!-- ====================================================================== -->
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-hadoop</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ <name>Jackrabbit on Hadoop and HBase</name>
+ <description>Jackrabbit PersistenceManager on HBase and DataStore on
+ Hadoop
+ </description>
- <dependencies>
- <dependency>
- <groupId>javax.jcr</groupId>
- <artifactId>jcr</artifactId>
- <version>1.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.jackrabbit</groupId>
- <artifactId>jackrabbit-core</artifactId>
- <version>1.6-SNAPSHOT</version>
- <exclusions>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.5.3</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- <version>1.5.3</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hbase</artifactId>
- <version>0.19.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>0.19.0</version>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <target>1.6</target>
- <source>1.6</source>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-idea-plugin</artifactId>
- <version>2.0</version>
- <configuration>
- <downloadSources>true</downloadSources>
- <jdkLevel>1.6</jdkLevel>
- </configuration>
- </plugin>
- </plugins>
- </build>
+ <dependencies>
+ <dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ <version>1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-core</artifactId>
+ <version>1.6-SNAPSHOT</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.5.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>1.5.3</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hbase</artifactId>
+ <version>0.19.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>0.19.0</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <target>1.6</target>
+ <source>1.6</source>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-idea-plugin</artifactId>
+ <version>2.0</version>
+ <configuration>
+ <downloadSources>true</downloadSources>
+ <jdkLevel>1.6</jdkLevel>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</project>
Modified: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java?rev=746838&r1=746837&r2=746838&view=diff
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java (original)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBasePersistenceManager.java Sun Feb 22 22:28:11 2009
@@ -36,6 +36,7 @@
import org.apache.jackrabbit.core.persistence.util.Serializer;
import org.apache.jackrabbit.core.NodeId;
import org.apache.jackrabbit.core.NodeIdIterator;
+import org.apache.jackrabbit.core.util.StringIndex;
import org.apache.jackrabbit.core.state.ItemStateException;
import org.apache.jackrabbit.core.state.NodeReferences;
import org.apache.jackrabbit.core.state.NodeReferencesId;
@@ -63,7 +64,11 @@
private String tablePrefix = null;
- private static final byte[] DATA_COLUMN = Bytes.toBytes("data:");
+ static final byte[] DATA_COLUMN = Bytes.toBytes("data:");
+
+ static final byte[] NAME_COLUMN = Bytes.toBytes("name:");
+
+ private StringIndex nameIndex;
private HBaseConfiguration config = new HBaseConfiguration();
@@ -71,6 +76,8 @@
private TransactionalTable noderefs;
+ private TransactionalTable nameIdx;
+
private BundleBinding binding;
private TransactionManager txMgr;
@@ -91,6 +98,7 @@
// set table names
byte[] bundlesTable = Bytes.toBytes(tablePrefix + "bundles");
byte[] nodeRefsTable = Bytes.toBytes(tablePrefix + "noderefs");
+ byte[] namesTable = Bytes.toBytes(tablePrefix + "names");
HBaseAdmin admin = new HBaseAdmin(config);
if (!admin.tableExists(bundlesTable)) {
@@ -105,10 +113,19 @@
admin.createTable(noderefs);
admin.enableTable(nodeRefsTable);
}
+ if (!admin.tableExists(namesTable)) {
+ HTableDescriptor descriptor = new HTableDescriptor(namesTable);
+ descriptor.addFamily(new HColumnDescriptor(NAME_COLUMN));
+ admin.createTable(descriptor);
+ admin.enableTable(namesTable);
+ }
txMgr = new TransactionManager(config);
bundles = new TransactionalTable(config, bundlesTable);
noderefs = new TransactionalTable(config, nodeRefsTable);
+ nameIdx = new TransactionalTable(config, namesTable);
+
+ nameIndex = new HBaseStringIndex(nameIdx);
binding = new BundleBinding(new ErrorHandling(ErrorHandling.IGNORE_MISSING_BLOBS),
null, getNsIndex(), getNameIndex(), context.getDataStore());
@@ -236,6 +253,11 @@
} catch (IOException e) {
// TODO: log
}
+ try {
+ nameIdx.close();
+ } catch (IOException e) {
+ // TODO: log
+ }
}
public boolean exists(NodeReferencesId id) throws ItemStateException {
@@ -293,6 +315,10 @@
}
}
+ public StringIndex getNameIndex() {
+ return nameIndex;
+ }
+
//---------------------------< parameters >---------------------------------
public String getRootdir() {
Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.util.StringIndex;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.RowLock;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * <code>HBaseStringIndex</code>...
+ */
+public class HBaseStringIndex implements StringIndex {
+
+ private static final byte[] LOCK_ROW = new byte[]{(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff};
+
+ private final TransactionalTable table;
+
+ // caches
+ private final Map<String, Integer> string2Index = new HashMap<String, Integer>();
+ private final Map<Integer, String> index2String = new HashMap<Integer, String>();
+
+ public HBaseStringIndex(TransactionalTable table) throws IOException {
+ this.table = table;
+ // make sure lock row exists
+ checkLockRow();
+ // read names
+ readNames(0);
+ }
+
+ public int stringToIndex(String string) throws IllegalArgumentException {
+ Integer index = string2Index.get(string);
+ if (index != null) {
+ return index;
+ }
+ try {
+ return addIfNotFound(string);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public String indexToString(int idx) throws IllegalArgumentException {
+ if (idx < 0) {
+ throw new IllegalArgumentException("idx < 0");
+ }
+ String string = index2String.get(idx);
+ if (string != null) {
+ return string;
+ }
+ throw new IllegalArgumentException("Unknown index: " + idx);
+ }
+
+ protected void checkLockRow() throws IOException {
+ Cell cell = table.get(LOCK_ROW, HBasePersistenceManager.NAME_COLUMN);
+ if (cell == null) {
+ BatchUpdate update = new BatchUpdate(LOCK_ROW);
+ update.put(HBasePersistenceManager.NAME_COLUMN, Bytes.toBytes("lock"));
+ table.commit(update);
+ table.flushCommits();
+ }
+ }
+
+ protected int addIfNotFound(String string) throws IOException {
+ RowLock lock = lock();
+ try {
+ // make sure we are up-to-date
+ readNames(string2Index.size());
+ // check if string is present
+ Integer index = string2Index.get(string);
+ if (index == null) {
+ // add it
+ index = string2Index.size();
+ BatchUpdate update = new BatchUpdate(Bytes.toBytes(index));
+ update.put(HBasePersistenceManager.NAME_COLUMN, Bytes.toBytes(string));
+ table.commit(update);
+ table.flushCommits();
+ string2Index.put(string, index);
+ index2String.put(index, string);
+ }
+ return index;
+ } finally {
+ unlock(lock);
+ }
+ }
+
+ protected Scanner getScanner(int start) throws IOException {
+ return table.getScanner(new byte[][]{HBasePersistenceManager.NAME_COLUMN}, Bytes.toBytes(start), LOCK_ROW);
+ }
+
+ protected RowLock lock() throws IOException {
+ return table.lockRow(LOCK_ROW);
+ }
+
+ protected void unlock(RowLock lock) throws IOException {
+ table.unlockRow(lock);
+ }
+
+ protected void readNames(int start) throws IOException {
+ Scanner s = getScanner(start);
+ try {
+ for (RowResult row : s) {
+ int idx = Bytes.toInt(row.getRow());
+ String string = Bytes.toString(row.get(HBasePersistenceManager.NAME_COLUMN).getValue());
+ string2Index.put(string, idx);
+ index2String.put(idx, string);
+ }
+ } finally {
+ s.close();
+ }
+ }
+}
Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/HBaseStringIndex.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.journal.AbstractJournal;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.AppendRecord;
+import org.apache.jackrabbit.core.journal.InstanceRevision;
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
+import org.apache.hadoop.hbase.client.transactional.TransactionManager;
+import org.apache.hadoop.hbase.client.transactional.TransactionState;
+import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.client.RowLock;
+
+/**
+ * <code>HBaseJournal</code>...
+ */
+public class HBaseJournal extends AbstractJournal {
+
+ static final byte[] JOURNAL_FAMILY = Bytes.toBytes("journal:");
+
+ static final byte[] JOURNAL_ID = Bytes.toBytes("journal:journal_id");
+
+ static final byte[] PRODUCER_ID = Bytes.toBytes("journal:producer_id");
+
+ static final byte[] REVISION_ID = Bytes.toBytes("journal:revision_id");
+
+ static final byte[] REVISION_DATA = Bytes.toBytes("journal:revision_data");
+
+ static final byte[] GLOBAL_REVISION_ROW = Bytes.toBytes(0);
+
+ private HBaseConfiguration config = new HBaseConfiguration();
+
+ private TransactionalTable journal;
+
+ private TransactionalTable localRevisions;
+
+ private TransactionalTable globalRevision;
+
+ private TransactionManager txMgr;
+
+ private TransactionState txState;
+
+ private InstanceRevision instanceRevision;
+
+ private RowLock globalRevisionLock;
+
+ /**
+ * Locked revision.
+ */
+ private long lockedRevision;
+
+ /**
+ * Auto commit level.
+ */
+ private int lockLevel;
+
+ public void init(String id, NamespaceResolver resolver)
+ throws JournalException {
+ super.init(id, resolver);
+ config.set("hbase.regionserver.class", "org.apache.hadoop.hbase.ipc.TransactionalRegionInterface");
+ config.set("hbase.regionserver.impl", "org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer");
+
+ txMgr = new TransactionManager(config);
+ try {
+ initializeTables();
+ instanceRevision = new HBaseRevision(localRevisions, id);
+ } catch (Exception e) {
+ closeTables();
+ throw new JournalException(e.getMessage(), e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * Save away the locked revision inside the newly appended record.
+ */
+ protected void appending(AppendRecord record) {
+ record.setRevision(lockedRevision);
+ }
+
+ protected void append(AppendRecord record, InputStream in, int length)
+ throws JournalException {
+ try {
+ BatchUpdate update = new BatchUpdate(Bytes.toBytes(record.getRevision()));
+ update.put(JOURNAL_ID, Bytes.toBytes(getId()));
+ update.put(PRODUCER_ID, Bytes.toBytes(record.getProducerId()));
+ byte[] data = new byte[length];
+ int remaining = length;
+ while (remaining > 0) {
+ int read = in.read(data, length - remaining, remaining);
+ if (read == -1) {
+ break;
+ } else {
+ remaining -= read;
+ }
+ }
+ update.put(REVISION_DATA, data);
+ journal.commit(txState, update);
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ }
+ }
+
+ public InstanceRevision getInstanceRevision() throws JournalException {
+ return instanceRevision;
+ }
+
+ public RecordIterator getRecords(long startRevision)
+ throws JournalException {
+ Scanner scanner;
+ try {
+ byte[][] columns = new byte[][]{JOURNAL_ID, PRODUCER_ID, REVISION_DATA};
+ scanner = journal.getScanner(columns, Bytes.toBytes(startRevision + 1));
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ }
+ try {
+ return new HBaseRecordIterator(scanner, getResolver(), getNamePathResolver());
+ } catch (IOException e) {
+ scanner.close();
+ throw new JournalException(e.getMessage(), e);
+ }
+ }
+
+ public RecordIterator getRecords() throws JournalException {
+ return getRecords(0);
+ }
+
+ protected void doLock() throws JournalException {
+ boolean succeeded = false;
+ try {
+ if (lockLevel++ == 0) {
+ txState = txMgr.beginTransaction();
+ globalRevisionLock = globalRevision.lockRow(GLOBAL_REVISION_ROW);
+ }
+
+ Cell cell = globalRevision.getRow(GLOBAL_REVISION_ROW,
+ new byte[][]{REVISION_ID}, HConstants.LATEST_TIMESTAMP,
+ 1, globalRevisionLock).get(REVISION_ID);
+ long gr = Bytes.toLong(cell.getValue()) + 1;
+ BatchUpdate update = new BatchUpdate(GLOBAL_REVISION_ROW);
+ update.put(REVISION_ID, Bytes.toBytes(gr));
+ update.setRowLock(globalRevisionLock.getLockId());
+ globalRevision.commit(txState, update);
+ lockedRevision = gr;
+
+ succeeded = true;
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ } finally {
+ if (!succeeded) {
+ doUnlock(false);
+ }
+ }
+ }
+
+ protected void doUnlock(boolean successful) {
+ if (--lockLevel == 0) {
+ if (txState != null) {
+ try {
+ if (successful) {
+ txMgr.tryCommit(txState);
+ } else {
+ txMgr.abort(txState);
+ }
+ } catch (IOException e) {
+ // TODO: log
+ } catch (CommitUnsuccessfulException e) {
+ // TODO: log
+ } finally {
+ txState = null;
+ if (globalRevisionLock != null) {
+ try {
+ globalRevision.unlockRow(globalRevisionLock);
+ } catch (IOException e) {
+ // TODO: log
+ } finally {
+ globalRevisionLock = null;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public void close() {
+ closeTables();
+ }
+
+ //--------------------------< parameters >----------------------------------
+
+ public String getRootdir() {
+ return config.get("hbase.rootdir");
+ }
+
+ public void setRootdir(String rootdir) {
+ config.set("hbase.rootdir", rootdir);
+ }
+
+ protected void initializeTables() throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(config);
+ byte[] journalTable = Bytes.toBytes("journal");
+ if (!admin.tableExists(journalTable)) {
+ HTableDescriptor descriptor = new HTableDescriptor(journalTable);
+ descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+ admin.createTable(descriptor);
+ admin.enableTable(journalTable);
+ }
+ journal = new TransactionalTable(config, journalTable);
+ byte[] localRevisionsTable = Bytes.toBytes("local_revisions");
+ if (!admin.tableExists(localRevisionsTable)) {
+ HTableDescriptor descriptor = new HTableDescriptor(localRevisionsTable);
+ descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+ admin.createTable(descriptor);
+ admin.enableTable(localRevisionsTable);
+ }
+ localRevisions = new TransactionalTable(config, localRevisionsTable);
+ byte[] globalRevisionTable = Bytes.toBytes("global_revision");
+ if (!admin.tableExists(globalRevisionTable)) {
+ HTableDescriptor descriptor = new HTableDescriptor(globalRevisionTable);
+ descriptor.addFamily(new HColumnDescriptor(JOURNAL_FAMILY));
+ admin.createTable(descriptor);
+ admin.enableTable(globalRevisionTable);
+ }
+ globalRevision = new TransactionalTable(config, globalRevisionTable);
+
+ // make sure initial revision exists
+ if (globalRevision.get(GLOBAL_REVISION_ROW, REVISION_ID) == null) {
+ BatchUpdate update = new BatchUpdate(GLOBAL_REVISION_ROW);
+ update.put(REVISION_ID, new byte[0]);
+ globalRevision.commit(update);
+ globalRevision.flushCommits();
+ }
+ }
+
+ protected void closeTables() {
+ closeTable(journal);
+ closeTable(localRevisions);
+ closeTable(globalRevision);
+ }
+
+ protected static void closeTable(HTable table) {
+ try {
+ if (table != null) {
+ table.close();
+ }
+ } catch (IOException e) {
+ // TODO: log
+ }
+ }
+}
Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseJournal.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.util.NoSuchElementException;
+import java.io.IOException;
+import java.io.DataInputStream;
+import java.io.ByteArrayInputStream;
+
+import org.apache.jackrabbit.core.journal.RecordIterator;
+import org.apache.jackrabbit.core.journal.Record;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.jackrabbit.core.journal.ReadRecord;
+import org.apache.jackrabbit.spi.commons.conversion.NamePathResolver;
+import org.apache.jackrabbit.spi.commons.namespace.NamespaceResolver;
+import org.apache.hadoop.hbase.client.Scanner;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * <code>HBaseRecordIterator</code>...
+ */
+public class HBaseRecordIterator implements RecordIterator {
+
+ private final Scanner scanner;
+
+ private final NamespaceResolver resolver;
+
+ private final NamePathResolver npResolver;
+
+ private Record next;
+
+ public HBaseRecordIterator(Scanner scanner,
+ NamespaceResolver resolver,
+ NamePathResolver npResolver) throws IOException {
+ this.scanner = scanner;
+ this.resolver = resolver;
+ this.npResolver = npResolver;
+ fetchNext();
+ }
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public Record nextRecord() throws NoSuchElementException, JournalException {
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+ Record rec = next;
+ try {
+ fetchNext();
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ }
+ return rec;
+ }
+
+ public void close() {
+ scanner.close();
+ }
+
+ protected void fetchNext() throws IOException {
+ next = null;
+ RowResult row = scanner.next();
+ if (row != null) {
+ String journalId = Bytes.toString(row.get(HBaseJournal.JOURNAL_ID).getValue());
+ String producerId = Bytes.toString(row.get(HBaseJournal.PRODUCER_ID).getValue());
+ long revision = Bytes.toLong(row.getRow());
+ byte[] revisionData = row.get(HBaseJournal.REVISION_DATA).getValue();
+ DataInputStream in = new DataInputStream(new ByteArrayInputStream(revisionData));
+ next = new ReadRecord(journalId, producerId, revision,
+ in, revisionData.length, resolver, npResolver);
+ }
+ }
+}
Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRecordIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java
URL: http://svn.apache.org/viewvc/jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java?rev=746838&view=auto
==============================================================================
--- jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java (added)
+++ jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java Sun Feb 22 22:28:11 2009
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.hadoop.journal;
+
+import java.io.IOException;
+
+import org.apache.jackrabbit.core.journal.InstanceRevision;
+import org.apache.jackrabbit.core.journal.JournalException;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+
+/**
+ * <code>HBaseRevision</code>...
+ */
+public class HBaseRevision implements InstanceRevision {
+
+ private final HTable localRevisions;
+
+ private final byte[] journalId;
+
+ private long revision;
+
+ public HBaseRevision(HTable localRevisions, String journalId)
+ throws JournalException {
+ this.localRevisions = localRevisions;
+ this.journalId = Bytes.toBytes(journalId);
+ try {
+ Cell cell = localRevisions.get(this.journalId, HBaseJournal.REVISION_ID);
+ if (cell == null) {
+ set(0);
+ } else {
+ revision = Bytes.toLong(cell.getValue());
+ }
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ }
+ }
+
+ public synchronized long get() throws JournalException {
+ return revision;
+ }
+
+ public synchronized void set(long value) throws JournalException {
+ BatchUpdate update = new BatchUpdate(journalId);
+ update.put(HBaseJournal.REVISION_ID, Bytes.toBytes(value));
+ try {
+ localRevisions.commit(update);
+ } catch (IOException e) {
+ throw new JournalException(e.getMessage(), e);
+ }
+ revision = value;
+ }
+
+ public void close() {
+ }
+}
Propchange: jackrabbit/sandbox/jackrabbit-hadoop/src/main/java/org/apache/jackrabbit/hadoop/journal/HBaseRevision.java
------------------------------------------------------------------------------
svn:eol-style = native