You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/09/19 13:52:24 UTC
svn commit: r1387533 [4/10] - in /hama/trunk: ./ core/
core/src/main/java/org/apache/hama/bsp/ graph/
graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/
jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/
j...
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A bucket is a placeholder for multiple (key, value) pairs. Buckets are used
+ * to store collisions (same hash value) at all levels of an H*tree.
+ * <p/>
+ * There are two types of buckets: leaf and non-leaf.
+ * <p/>
+ * Non-leaf buckets are buckets which hold collisions which happen when the
+ * H*tree is not fully expanded. Keys in a non-leaf buckets can have different
+ * hash codes. Non-leaf buckets are limited to an arbitrary size. When this
+ * limit is reached, the H*tree should create a new HTreeDirectory node and
+ * distribute keys of the non-leaf buckets into the newly created
+ * HTreeDirectory.
+ * <p/>
+ * A leaf bucket is a bucket which contains keys which all have the same
+ * <code>hashCode()</code>. Leaf buckets stand at the bottom of an H*tree
+ * because the hashing algorithm cannot further discriminate between different
+ * keys based on their hash code.
+ */
+public final class HTreeBucket<K, V> {
+
+ /**
+ * The maximum number of elements (key, value) a non-leaf bucket can contain.
+ */
+ public static final int OVERFLOW_SIZE = 16;
+
+ /**
+ * Depth of this bucket.
+ */
+ private byte _depth;
+
+ /**
+ * Keys and values in this bucket. Keys are followed by values at
+ * KEYPOS+OVERFLOW_SIZE
+ */
+ private Object[] _keysAndValues;
+
+ private byte size = 0;
+
+ private final HTree<K, V> tree;
+
+ /**
+ * Public constructor for serialization.
+ */
+ public HTreeBucket(HTree<K, V> tree) {
+ this.tree = tree;
+ }
+
+ /**
+ * Construct a bucket with a given depth level. Depth level is the number of
+ * <code>HashDirectory</code> above this bucket.
+ */
+ public HTreeBucket(HTree<K, V> tree, byte level) {
+ this.tree = tree;
+ if (level > HTreeDirectory.MAX_DEPTH + 1) {
+ throw new IllegalArgumentException(
+ "Cannot create bucket with depth > MAX_DEPTH+1. " + "Depth=" + level);
+ }
+ _depth = level;
+ _keysAndValues = new Object[OVERFLOW_SIZE * 2];
+ }
+
+ /**
+ * Returns the number of elements contained in this bucket.
+ */
+ public int getElementCount() {
+ return size;
+ }
+
+ /**
+ * Returns whether or not this bucket is a "leaf bucket".
+ */
+ public boolean isLeaf() {
+ return (_depth > HTreeDirectory.MAX_DEPTH);
+ }
+
+ /**
+ * Returns true if bucket can accept at least one more element.
+ */
+ public boolean hasRoom() {
+ if (isLeaf()) {
+ return true; // leaf buckets are never full
+ } else {
+ // non-leaf bucket
+ return (size < OVERFLOW_SIZE);
+ }
+ }
+
+ /**
+ * Add an element (key, value) to this bucket. If an existing element has the
+ * same key, it is replaced silently.
+ *
+ * @return Object which was previously associated with the given key or
+ * <code>null</code> if no association existed.
+ */
+ public V addElement(K key, V value) {
+ // find entry
+ byte existing = -1;
+ for (byte i = 0; i < size; i++) {
+ if (key.equals(_keysAndValues[i])) {
+ existing = i;
+ break;
+ }
+ }
+
+ if (existing != -1) {
+ // replace existing element
+ Object before = _keysAndValues[existing + OVERFLOW_SIZE];
+ if (before instanceof BTreeLazyRecord) {
+ BTreeLazyRecord<V> rec = (BTreeLazyRecord<V>) before;
+ before = rec.get();
+ rec.delete();
+ }
+ _keysAndValues[existing + OVERFLOW_SIZE] = value;
+ return (V) before;
+ } else {
+ // add new (key, value) pair
+ _keysAndValues[size] = key;
+ _keysAndValues[size + OVERFLOW_SIZE] = value;
+ size++;
+ return null;
+ }
+ }
+
+ /**
+ * Remove an element, given a specific key.
+ *
+ * @param key Key of the element to remove
+ * @return Removed element value, or <code>null</code> if not found
+ */
+ public V removeElement(K key) {
+ // find entry
+ byte existing = -1;
+ for (byte i = 0; i < size; i++) {
+ if (key.equals(_keysAndValues[i])) {
+ existing = i;
+ break;
+ }
+ }
+
+ if (existing != -1) {
+ Object o = _keysAndValues[existing + OVERFLOW_SIZE];
+ if (o instanceof BTreeLazyRecord) {
+ BTreeLazyRecord<V> rec = (BTreeLazyRecord<V>) o;
+ o = rec.get();
+ rec.delete();
+ }
+
+ // move last element to existing
+ size--;
+ _keysAndValues[existing] = _keysAndValues[size];
+ _keysAndValues[existing + OVERFLOW_SIZE] = _keysAndValues[size
+ + OVERFLOW_SIZE];
+
+ // and unset last element
+ _keysAndValues[size] = null;
+ _keysAndValues[size + OVERFLOW_SIZE] = null;
+
+ return (V) o;
+ } else {
+ // not found
+ return null;
+ }
+ }
+
+ /**
+ * Returns the value associated with a given key. If the given key is not
+ * found in this bucket, returns <code>null</code>.
+ */
+ public V getValue(K key) {
+ // find entry
+ byte existing = -1;
+ for (byte i = 0; i < size; i++) {
+ if (key.equals(_keysAndValues[i])) {
+ existing = i;
+ break;
+ }
+ }
+
+ if (existing != -1) {
+ Object o = _keysAndValues[existing + OVERFLOW_SIZE];
+ if (o instanceof BTreeLazyRecord)
+ return ((BTreeLazyRecord<V>) o).get();
+ else
+ return (V) o;
+ } else {
+ // key not found
+ return null;
+ }
+ }
+
+ /**
+ * Obtain keys contained in this buckets. Keys are ordered to match their
+ * values, which be be obtained by calling <code>getValues()</code>.
+ * <p/>
+ * As an optimization, the Vector returned is the instance member of this
+ * class. Please don't modify outside the scope of this class.
+ */
+ ArrayList<K> getKeys() {
+ ArrayList<K> ret = new ArrayList<K>();
+ for (byte i = 0; i < size; i++) {
+ ret.add((K) _keysAndValues[i]);
+ }
+ return ret;
+ }
+
+ /**
+ * Obtain values contained in this buckets. Values are ordered to match their
+ * keys, which be be obtained by calling <code>getKeys()</code>.
+ * <p/>
+ * As an optimization, the Vector returned is the instance member of this
+ * class. Please don't modify outside the scope of this class.
+ */
+ ArrayList<V> getValues() {
+ ArrayList<V> ret = new ArrayList<V>();
+ for (byte i = 0; i < size; i++) {
+ ret.add((V) _keysAndValues[i + OVERFLOW_SIZE]);
+ }
+ return ret;
+
+ }
+
+ public void writeExternal(DataOutput out) throws IOException {
+ out.write(_depth);
+ out.write(size);
+
+ DataInputOutput out3 = tree.writeBufferCache.getAndSet(null);
+ if (out3 == null)
+ out3 = new DataInputOutput();
+ else
+ out3.reset();
+
+ Serializer keySerializer = tree.keySerializer != null ? tree.keySerializer
+ : tree.getRecordManager().defaultSerializer();
+ for (byte i = 0; i < size; i++) {
+ out3.reset();
+ keySerializer.serialize(out3, _keysAndValues[i]);
+ LongPacker.packInt(out, out3.getPos());
+ out.write(out3.getBuf(), 0, out3.getPos());
+
+ }
+
+ // write values
+ if (tree.hasValues()) {
+ Serializer valSerializer = tree.valueSerializer != null ? tree.valueSerializer
+ : tree.getRecordManager().defaultSerializer();
+
+ for (byte i = 0; i < size; i++) {
+ Object value = _keysAndValues[i + OVERFLOW_SIZE];
+ if (value == null) {
+ out.write(BTreeLazyRecord.NULL);
+ } else if (value instanceof BTreeLazyRecord) {
+ out.write(BTreeLazyRecord.LAZY_RECORD);
+ LongPacker.packLong(out, ((BTreeLazyRecord) value).recid);
+ } else {
+ // transform to byte array
+ out3.reset();
+ valSerializer.serialize(out3, value);
+
+ if (out3.getPos() > BTreeLazyRecord.MAX_INTREE_RECORD_SIZE) {
+ // store as separate record
+ long recid = tree.getRecordManager().insert(out3.toByteArray(),
+ BTreeLazyRecord.FAKE_SERIALIZER, true);
+ out.write(BTreeLazyRecord.LAZY_RECORD);
+ LongPacker.packLong(out, recid);
+ } else {
+ out.write(out3.getPos());
+ out.write(out3.getBuf(), 0, out3.getPos());
+ }
+ }
+ }
+ }
+ tree.writeBufferCache.set(out3);
+
+ }
+
+ public void readExternal(DataInputOutput in) throws IOException,
+ ClassNotFoundException {
+ _depth = in.readByte();
+ size = in.readByte();
+
+ // read keys
+ Serializer keySerializer = tree.keySerializer != null ? tree.keySerializer
+ : tree.getRecordManager().defaultSerializer();
+ _keysAndValues = (K[]) new Object[OVERFLOW_SIZE * 2];
+ for (byte i = 0; i < size; i++) {
+ int expectedSize = LongPacker.unpackInt(in);
+ K key = (K) BTreeLazyRecord.fastDeser(in, keySerializer, expectedSize);
+ _keysAndValues[i] = key;
+ }
+
+ // read values
+ if (tree.hasValues()) {
+ Serializer<V> valSerializer = tree.valueSerializer != null ? tree.valueSerializer
+ : (Serializer<V>) tree.getRecordManager().defaultSerializer();
+ for (byte i = 0; i < size; i++) {
+ int header = in.readUnsignedByte();
+ if (header == BTreeLazyRecord.NULL) {
+ _keysAndValues[i + OVERFLOW_SIZE] = null;
+ } else if (header == BTreeLazyRecord.LAZY_RECORD) {
+ long recid = LongPacker.unpackLong(in);
+ _keysAndValues[i + OVERFLOW_SIZE] = (new BTreeLazyRecord(
+ tree.getRecordManager(), recid, valSerializer));
+ } else {
+ _keysAndValues[i + OVERFLOW_SIZE] = BTreeLazyRecord.fastDeser(in,
+ valSerializer, header);
+ }
+ }
+ } else {
+ for (byte i = 0; i < size; i++) {
+ if (_keysAndValues[i] != null)
+ _keysAndValues[i + OVERFLOW_SIZE] = JDBMUtils.EMPTY_STRING;
+ }
+ }
+ }
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Hashtable directory page.
+ */
+public final class HTreeDirectory<K, V> {
+
+ /**
+ * Maximum number of children in a directory.
+ * <p/>
+ * (Must be a power of 2 -- if you update this value, you must also update
+ * BIT_SIZE and MAX_DEPTH.)
+ * <p/>
+ * !!!! do not change this, it affects storage format, there are also magic
+ * numbers which relies on 255 !!!
+ */
+ static final int MAX_CHILDREN = 256;
+
+ /**
+ * Number of significant bits per directory level.
+ */
+ static final int BIT_SIZE = 8; // log2(256) = 8
+
+ /**
+ * Maximum number of levels (zero-based)
+ * <p/>
+ * (4 * 8 bits = 32 bits, which is the size of an "int", and as you know,
+ * hashcodes in Java are "ints")
+ */
+ static final int MAX_DEPTH = 3; // 4 levels
+
+ /**
+ * Record ids of children nodes. It is saved in matrix to save memory, some
+ * subarrays may be null.
+ */
+ private long[][] _children;
+
+ /**
+ * Depth of this directory page, zero-based
+ */
+ private byte _depth;
+
+ /**
+ * This directory's record ID in the DB. (transient)
+ */
+ private long _recid;
+
+ /** if this is root (depth=0), it contains size, otherwise -1 */
+ long size;
+
+ protected final HTree<K, V> tree;
+
+ /**
+ * Public constructor used by serialization
+ */
+ public HTreeDirectory(HTree<K, V> tree) {
+ this.tree = tree;
+ }
+
+ /**
+ * Construct a HashDirectory
+ *
+ * @param depth Depth of this directory node.
+ */
+ HTreeDirectory(HTree<K, V> tree, byte depth) {
+ this.tree = tree;
+ _depth = depth;
+ _children = new long[32][];
+ }
+
+ /**
+ * Sets persistence context. This method must be called before any
+ * persistence-related operation.
+ *
+ * @param recid Record id of this directory.
+ */
+ void setPersistenceContext(long recid) {
+ this._recid = recid;
+ }
+
+ /**
+ * Get the record identifier used to load this hashtable.
+ */
+ long getRecid() {
+ return _recid;
+ }
+
+ /**
+ * Returns whether or not this directory is empty. A directory is empty when
+ * it no longer contains buckets or sub-directories.
+ */
+ boolean isEmpty() {
+ for (int i = 0; i < _children.length; i++) {
+ long[] sub = _children[i];
+ if (sub != null) {
+ for (int j = 0; j < 8; j++) {
+ if (sub[j] != 0) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Returns the value which is associated with the given key. Returns
+ * <code>null</code> if there is not association for this key.
+ *
+ * @param key key whose associated value is to be returned
+ */
+ V get(K key) throws IOException {
+ int hash = hashCode(key);
+ long child_recid = getRecid(hash);
+ if (child_recid == 0) {
+ // not bucket/node --> not found
+ return null;
+ } else {
+ Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+ // System.out.println("HashDirectory.get() child is : "+node);
+
+ if (node instanceof HTreeDirectory) {
+ // recurse into next directory level
+ HTreeDirectory<K, V> dir = (HTreeDirectory<K, V>) node;
+ dir.setPersistenceContext(child_recid);
+ return dir.get(key);
+ } else {
+ // node is a bucket
+ HTreeBucket<K, V> bucket = (HTreeBucket) node;
+ return bucket.getValue(key);
+ }
+ }
+ }
+
+ private long getRecid(int hash) {
+ long[] sub = _children[hash >>> 3];
+ return sub == null ? 0 : sub[hash % 8];
+ }
+
+ private void putRecid(int hash, long recid) {
+ long[] sub = _children[hash >>> 3];
+ if (sub == null) {
+ sub = new long[8];
+ _children[hash >>> 3] = sub;
+ }
+ sub[hash % 8] = recid;
+ }
+
+ /**
+ * Associates the specified value with the specified key.
+ *
+ * @param key key with which the specified value is to be assocated.
+ * @param value value to be associated with the specified key.
+ * @return object which was previously associated with the given key, or
+ * <code>null</code> if no association existed.
+ */
+ Object put(final Object key, final Object value) throws IOException {
+ if (value == null) {
+ return remove(key);
+ }
+ int hash = hashCode(key);
+ long child_recid = getRecid(hash);
+ if (child_recid == 0) {
+ // no bucket/node here yet, let's create a bucket
+ HTreeBucket bucket = new HTreeBucket(tree, (byte) (_depth + 1));
+
+ // insert (key,value) pair in bucket
+ Object existing = bucket.addElement(key, value);
+
+ long b_recid = tree.db.insert(bucket, tree.SERIALIZER, false);
+ putRecid(hash, b_recid);
+
+ tree.db.update(_recid, this, tree.SERIALIZER);
+
+ // System.out.println("Added: "+bucket);
+ return existing;
+ } else {
+ Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+
+ if (node instanceof HTreeDirectory) {
+ // recursive insert in next directory level
+ HTreeDirectory dir = (HTreeDirectory) node;
+ dir.setPersistenceContext(child_recid);
+ return dir.put(key, value);
+ } else {
+ // node is a bucket
+ HTreeBucket bucket = (HTreeBucket) node;
+ if (bucket.hasRoom()) {
+ Object existing = bucket.addElement(key, value);
+ tree.db.update(child_recid, bucket, tree.SERIALIZER);
+ // System.out.println("Added: "+bucket);
+ return existing;
+ } else {
+ // overflow, so create a new directory
+ if (_depth == MAX_DEPTH) {
+ throw new RuntimeException("Cannot create deeper directory. "
+ + "Depth=" + _depth);
+ }
+ HTreeDirectory dir = new HTreeDirectory(tree, (byte) (_depth + 1));
+ long dir_recid = tree.db.insert(dir, tree.SERIALIZER, false);
+ dir.setPersistenceContext(dir_recid);
+
+ putRecid(hash, dir_recid);
+ tree.db.update(_recid, this, tree.SERIALIZER);
+
+ // discard overflown bucket
+ tree.db.delete(child_recid);
+
+ // migrate existing bucket elements
+ ArrayList keys = bucket.getKeys();
+ ArrayList values = bucket.getValues();
+ int entries = keys.size();
+ for (int i = 0; i < entries; i++) {
+ dir.put(keys.get(i), values.get(i));
+ }
+
+ // (finally!) insert new element
+ return dir.put(key, value);
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove the value which is associated with the given key. If the key does
+ * not exist, this method simply ignores the operation.
+ *
+ * @param key key whose associated value is to be removed
+ * @return object which was associated with the given key, or
+ * <code>null</code> if no association existed with given key.
+ */
+ Object remove(Object key) throws IOException {
+ int hash = hashCode(key);
+ long child_recid = getRecid(hash);
+ if (child_recid == 0) {
+ // not bucket/node --> not found
+ return null;
+ } else {
+ Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+ // System.out.println("HashDirectory.remove() child is : "+node);
+
+ if (node instanceof HTreeDirectory) {
+ // recurse into next directory level
+ HTreeDirectory dir = (HTreeDirectory) node;
+ dir.setPersistenceContext(child_recid);
+ Object existing = dir.remove(key);
+ if (existing != null) {
+ if (dir.isEmpty()) {
+ // delete empty directory
+ tree.db.delete(child_recid);
+ putRecid(hash, 0);
+ tree.db.update(_recid, this, tree.SERIALIZER);
+ }
+ }
+ return existing;
+ } else {
+ // node is a bucket
+ HTreeBucket bucket = (HTreeBucket) node;
+ Object existing = bucket.removeElement(key);
+ if (existing != null) {
+ if (bucket.getElementCount() >= 1) {
+ tree.db.update(child_recid, bucket, tree.SERIALIZER);
+ } else {
+ // delete bucket, it's empty
+ tree.db.delete(child_recid);
+ putRecid(hash, 0);
+ tree.db.update(_recid, this, tree.SERIALIZER);
+ }
+ }
+ return existing;
+ }
+ }
+ }
+
+ /**
+ * Calculates the hashcode of a key, based on the current directory depth.
+ */
+ private int hashCode(Object key) {
+ int hashMask = hashMask();
+ int hash = key.hashCode();
+ hash = hash & hashMask;
+ hash = hash >>> ((MAX_DEPTH - _depth) * BIT_SIZE);
+ hash = hash % MAX_CHILDREN;
+ /*
+ * System.out.println("HashDirectory.hashCode() is: 0x"
+ * +Integer.toHexString(hash) +" for object hashCode() 0x"
+ * +Integer.toHexString(key.hashCode()));
+ */
+ return hash;
+ }
+
+ /**
+ * Calculates the hashmask of this directory. The hashmask is the bit mask
+ * applied to a hashcode to retain only bits that are relevant to this
+ * directory level.
+ */
+ int hashMask() {
+ int bits = MAX_CHILDREN - 1;
+ int hashMask = bits << ((MAX_DEPTH - _depth) * BIT_SIZE);
+ /*
+ * System.out.println("HashDirectory.hashMask() is: 0x"
+ * +Integer.toHexString(hashMask));
+ */
+ return hashMask;
+ }
+
+ /**
+ * Returns an enumeration of the keys contained in this
+ */
+ Iterator<K> keys() throws IOException {
+ return new HDIterator(true);
+ }
+
+ /**
+ * Returns an enumeration of the values contained in this
+ */
+ Iterator<V> values() throws IOException {
+ return new HDIterator(false);
+ }
+
+ public void writeExternal(DataOutput out) throws IOException {
+ out.writeByte(_depth);
+ if (_depth == 0) {
+ LongPacker.packLong(out, size);
+ }
+
+ int zeroStart = 0;
+ for (int i = 0; i < MAX_CHILDREN; i++) {
+ if (getRecid(i) != 0) {
+ zeroStart = i;
+ break;
+ }
+ }
+
+ out.write(zeroStart);
+ if (zeroStart == MAX_CHILDREN)
+ return;
+
+ int zeroEnd = 0;
+ for (int i = MAX_CHILDREN - 1; i >= 0; i--) {
+ if (getRecid(i) != 0) {
+ zeroEnd = i;
+ break;
+ }
+ }
+ out.write(zeroEnd);
+
+ for (int i = zeroStart; i <= zeroEnd; i++) {
+ LongPacker.packLong(out, getRecid(i));
+ }
+ }
+
+ public void readExternal(DataInputOutput in) throws IOException,
+ ClassNotFoundException {
+ _depth = in.readByte();
+ if (_depth == 0)
+ size = LongPacker.unpackLong(in);
+ else
+ size = -1;
+
+ _children = new long[32][];
+ int zeroStart = in.readUnsignedByte();
+ int zeroEnd = in.readUnsignedByte();
+
+ for (int i = zeroStart; i <= zeroEnd; i++) {
+ long recid = LongPacker.unpackLong(in);
+ if (recid != 0)
+ putRecid(i, recid);
+ }
+
+ }
+
+ public void defrag(DBStore r1, DBStore r2) throws IOException,
+ ClassNotFoundException {
+ for (long[] sub : _children) {
+ if (sub == null)
+ continue;
+ for (long child : sub) {
+ if (child == 0)
+ continue;
+ byte[] data = r1.fetchRaw(child);
+ r2.forceInsert(child, data);
+ Object t = tree.SERIALIZER.deserialize(new DataInputOutput(data));
+ if (t instanceof HTreeDirectory) {
+ ((HTreeDirectory) t).defrag(r1, r2);
+ }
+ }
+ }
+ }
+
+ void deleteAllChildren() throws IOException {
+ for (long[] ll : _children) {
+ if (ll != null) {
+ for (long l : ll) {
+ if (l != 0) {
+ tree.db.delete(l);
+ }
+ }
+ }
+ }
+
+ }
+
+ // //////////////////////////////////////////////////////////////////////
+ // INNER CLASS
+ // //////////////////////////////////////////////////////////////////////
+
+ /**
+ * Utility class to enumerate keys/values in a HTree
+ */
+ class HDIterator<A> implements Iterator<A> {
+
+ /**
+ * True if we're iterating on keys, False if enumerating on values.
+ */
+ private boolean _iterateKeys;
+
+ /**
+ * Stacks of directories & last enumerated child position
+ */
+ private ArrayList _dirStack;
+ private ArrayList _childStack;
+
+ /**
+ * Current HashDirectory in the hierarchy
+ */
+ private HTreeDirectory _dir;
+
+ /**
+ * Current child position
+ */
+ private int _child;
+
+ /**
+ * Current bucket iterator
+ */
+ private Iterator<A> _iter;
+
+ private A next;
+
+ /**
+ * last item returned in next(), is used to remove() last item
+ */
+ private A last;
+
+ private int expectedModCount;
+
+ /**
+ * Construct an iterator on this directory.
+ *
+ * @param iterateKeys True if iteration supplies keys, False if iterateKeys
+ * supplies values.
+ */
+ HDIterator(boolean iterateKeys) throws IOException {
+ _dirStack = new ArrayList();
+ _childStack = new ArrayList();
+ _dir = HTreeDirectory.this;
+ _child = -1;
+ _iterateKeys = iterateKeys;
+ expectedModCount = tree.modCount;
+
+ prepareNext();
+ next = next2();
+
+ }
+
+ /**
+ * Returns the next object.
+ */
+ public A next2() {
+ A next = null;
+ if (_iter != null && _iter.hasNext()) {
+ next = _iter.next();
+ } else {
+ try {
+ prepareNext();
+ } catch (IOException except) {
+ throw new IOError(except);
+ }
+ if (_iter != null && _iter.hasNext()) {
+ return next2();
+ }
+ }
+ return next;
+ }
+
+ /**
+ * Prepare internal state so we can answer <code>hasMoreElements</code>
+ * <p/>
+ * Actually, this code prepares an Enumeration on the next Bucket to
+ * enumerate. If no following bucket is found, the next Enumeration is set
+ * to <code>null</code>.
+ */
+ private void prepareNext() throws IOException {
+ long child_recid = 0;
+
+ // get next bucket/directory to enumerate
+ do {
+ _child++;
+ if (_child >= MAX_CHILDREN) {
+
+ if (_dirStack.isEmpty()) {
+ // no more directory in the stack, we're finished
+ return;
+ }
+
+ // try next node
+ _dir = (HTreeDirectory) _dirStack.remove(_dirStack.size() - 1);
+ _child = ((Integer) _childStack.remove(_childStack.size() - 1))
+ .intValue();
+ continue;
+ }
+ child_recid = _dir.getRecid(_child);
+ } while (child_recid == 0);
+
+ if (child_recid == 0) {
+ throw new Error("child_recid cannot be 0");
+ }
+
+ Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+ // System.out.println("HDEnumeration.get() child is : "+node);
+
+ if (node instanceof HTreeDirectory) {
+ // save current position
+ _dirStack.add(_dir);
+ _childStack.add(new Integer(_child));
+
+ _dir = (HTreeDirectory) node;
+ _child = -1;
+
+ // recurse into
+ _dir.setPersistenceContext(child_recid);
+ prepareNext();
+ } else {
+ // node is a bucket
+ HTreeBucket bucket = (HTreeBucket) node;
+ if (_iterateKeys) {
+ ArrayList keys2 = bucket.getKeys();
+ _iter = keys2.iterator();
+ } else {
+ _iter = bucket.getValues().iterator();
+ }
+ }
+ }
+
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ public A next() {
+ if (next == null)
+ throw new NoSuchElementException();
+ if (expectedModCount != tree.modCount)
+ throw new ConcurrentModificationException();
+ last = next;
+ next = next2();
+ return last;
+ }
+
+ public void remove() {
+ if (last == null)
+ throw new IllegalStateException();
+
+ if (expectedModCount != tree.modCount)
+ throw new ConcurrentModificationException();
+
+ // TODO current delete behaviour may change node layout. INVESTIGATE if
+ // this can happen!
+ tree.remove(last);
+ last = null;
+ expectedModCount++;
+ }
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+
+/**
+ * Wrapper for HTree to implement java.util.Map interface
+ */
+public final class HTreeSet<E> extends AbstractSet<E> {
+
+ final HTree<E, Object> map;
+
+ HTreeSet(HTree<E, Object> map) {
+ this.map = map;
+ }
+
+ @Override
+ public Iterator<E> iterator() {
+ return map.keySet().iterator();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return map.containsKey(o);
+ }
+
+ @Override
+ public boolean add(E e) {
+ return map.put(e, JDBMUtils.EMPTY_STRING) == null;
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ return map.remove(o) == JDBMUtils.EMPTY_STRING;
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import javax.crypto.Cipher;
+
+/**
+ * Various utilities used in JDBM
+ */
+public final class JDBMUtils {
+
+ /**
+ * empty string is used as dummy value to represent null values in HashSet and
+ * TreeSet
+ */
+ static final String EMPTY_STRING = "";
+
+ public static byte[] encrypt(Cipher cipherIn, ByteBuffer b) {
+ if (cipherIn == null && b.hasArray())
+ return b.array();
+ byte[] bb = new byte[Storage.PAGE_SIZE];
+ b.rewind();
+ b.get(bb, 0, Storage.PAGE_SIZE);
+ return encrypt(cipherIn, bb);
+ }
+
+ public static byte[] encrypt(Cipher cipherIn, byte[] b) {
+ if (cipherIn == null)
+ return b;
+
+ try {
+ return cipherIn.doFinal(b);
+ } catch (Exception e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ /**
+ * Compares comparables. Default comparator for most of java types
+ */
+ static final Comparator COMPARABLE_COMPARATOR = new Comparator<Comparable>() {
+ public int compare(Comparable o1, Comparable o2) {
+ return o1 == null && o2 != null ? -1 : (o1 != null && o2 == null ? 1 : o1
+ .compareTo(o2));
+ }
+ };
+
+ static String formatSpaceUsage(long size) {
+ if (size < 1e4)
+ return size + "B";
+ else if (size < 1e7)
+ return "" + Math.round(1D * size / 1024D) + "KB";
+ else if (size < 1e10)
+ return "" + Math.round(1D * size / 1e6) + "MB";
+ else
+ return "" + Math.round(1D * size / 1e9) + "GB";
+ }
+
+ static boolean allZeros(byte[] b) {
+ for (int i = 0; i < b.length; i++) {
+ if (b[i] != 0)
+ return false;
+ }
+ return true;
+ }
+
+ static <E> E max(E e1, E e2, Comparator comp) {
+ if (e1 == null)
+ return e2;
+ if (e2 == null)
+ return e1;
+
+ if (comp == null)
+ comp = COMPARABLE_COMPARATOR;
+ return comp.compare(e1, e2) < 0 ? e2 : e1;
+ }
+
+ static <E> E min(E e1, E e2, Comparator comp) {
+ if (e1 == null)
+ return e2;
+ if (e2 == null)
+ return e1;
+
+ if (comp == null)
+ comp = COMPARABLE_COMPARATOR;
+
+ return comp.compare(e1, e2) > 0 ? e2 : e1;
+ }
+
+ static final Serializer<Object> NULL_SERIALIZER = new Serializer<Object>() {
+ public void serialize(DataOutput out, Object obj) throws IOException {
+ out.writeByte(11);
+ }
+
+ public Object deserialize(DataInput in) throws IOException,
+ ClassNotFoundException {
+ in.readByte();
+ return null;
+ }
+ };
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractSequentialList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+/**
+ * LinkedList which stores its nodes on disk.
+ */
+public final class LinkedList<E> extends AbstractSequentialList<E> {
+
+ private DBAbstract db;
+
+ final long rootRecid;
+ /**
+ * size limit, is not currently used, but needs to be here for future
+ * compatibility. Zero means no limit.
+ */
+ long sizeLimit = 0;
+
+ static final class Root {
+ long first;
+ long last;
+ long size;
+ }
+
+ private static final Serializer<Root> ROOT_SERIALIZER = new Serializer<Root>() {
+
+ @Override
+ public void serialize(DataOutput out, Root obj) throws IOException {
+ LongPacker.packLong(out, obj.first);
+ LongPacker.packLong(out, obj.last);
+ LongPacker.packLong(out, obj.size);
+ }
+
+ @Override
+ public Root deserialize(DataInput in) throws IOException,
+ ClassNotFoundException {
+ Root r = new Root();
+ r.first = LongPacker.unpackLong(in);
+ r.last = LongPacker.unpackLong(in);
+ r.size = LongPacker.unpackLong(in);
+ return r;
+ }
+ };
+
+ private Serializer<E> valueSerializer;
+
+ /**
+ * indicates that entry values should not be loaded during deserialization,
+ * used during defragmentation
+ */
+ protected boolean loadValues = true;
+
+ /** constructor used for deserialization */
+ LinkedList(DBAbstract db, long rootRecid, Serializer<E> valueSerializer) {
+ this.db = db;
+ this.rootRecid = rootRecid;
+ this.valueSerializer = valueSerializer;
+ }
+
+ /** constructor used to create new empty list */
+ LinkedList(DBAbstract db, Serializer<E> valueSerializer) throws IOException {
+ this.db = db;
+ if (valueSerializer != null && !(valueSerializer instanceof Serializable))
+ throw new IllegalArgumentException(
+ "Serializer does not implement Serializable");
+ this.valueSerializer = valueSerializer;
+ // create root
+ this.rootRecid = db.insert(new Root(), ROOT_SERIALIZER, false);
+ }
+
+ void setPersistenceContext(DBAbstract db) {
+ this.db = db;
+ }
+
+ @Override
+ public ListIterator<E> listIterator(int index) {
+ Root r = getRoot();
+ if (index < 0 || index > r.size)
+ throw new IndexOutOfBoundsException();
+
+ Iter iter = new Iter();
+ iter.next = r.first;
+
+ // scroll to requested position
+ // TODO scroll from end, if beyond half
+ for (int i = 0; i < index; i++) {
+ iter.next();
+ }
+ return iter;
+
+ }
+
+ Root getRoot() {
+ // expect that caller already holds lock
+ try {
+ return db.fetch(rootRecid, ROOT_SERIALIZER);
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+ }
+
+ @Override
+ public int size() {
+ return (int) getRoot().size;
+ }
+
+ public Iterator<E> descendingIterator() {
+ return null; // To change body of implemented methods use File | Settings |
+ // File Templates.
+ }
+
+ @Override
+ public boolean add(Object value) {
+ try {
+ Root r = getRoot();
+ Entry e = new Entry(r.last, 0, value);
+ long recid = db.insert(e, entrySerializer, false);
+
+ // update old last Entry to point to new record
+ if (r.last != 0) {
+ Entry oldLast = db.fetch(r.last, entrySerializer);
+ if (oldLast.next != 0)
+ throw new Error();
+ oldLast.next = recid;
+ db.update(r.last, oldLast, entrySerializer);
+ }
+
+ // update linked list
+ r.last = recid;
+ if (r.first == 0)
+ r.first = recid;
+ r.size++;
+ db.update(rootRecid, r, ROOT_SERIALIZER);
+ modCount++;
+ return true;
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ private Entry<E> fetch(long recid) {
+ try {
+ return db.fetch(recid, entrySerializer);
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ /**
+ * called from Serialization object
+ */
+ static LinkedList deserialize(DataInput is, Serialization ser)
+ throws IOException, ClassNotFoundException {
+ long rootrecid = LongPacker.unpackLong(is);
+ long sizeLimit = LongPacker.unpackLong(is);
+ if (sizeLimit != 0)
+ throw new InternalError(
+ "LinkedList.sizeLimit not supported in this JDBM version");
+ Serializer serializer = (Serializer) ser.deserialize(is);
+ return new LinkedList(ser.db, rootrecid, serializer);
+ }
+
+ void serialize(DataOutput out) throws IOException {
+ LongPacker.packLong(out, rootRecid);
+ LongPacker.packLong(out, sizeLimit);
+ db.defaultSerializer().serialize(out, valueSerializer);
+ }
+
+ private final Serializer<Entry> entrySerializer = new Serializer<Entry>() {
+
+ @Override
+ public void serialize(DataOutput out, Entry e) throws IOException {
+ LongPacker.packLong(out, e.prev);
+ LongPacker.packLong(out, e.next);
+ if (valueSerializer != null)
+ valueSerializer.serialize(out, (E) e.value);
+ else
+ db.defaultSerializer().serialize(out, e.value);
+ }
+
+ @Override
+ public Entry<E> deserialize(DataInput in) throws IOException,
+ ClassNotFoundException {
+ long prev = LongPacker.unpackLong(in);
+ long next = LongPacker.unpackLong(in);
+ Object value = null;
+ if (loadValues)
+ value = valueSerializer == null ? db.defaultSerializer()
+ .deserialize(in) : valueSerializer.deserialize(in);
+ return new LinkedList.Entry(prev, next, value);
+ }
+ };
+
+ static class Entry<E> {
+ long prev = 0;
+ long next = 0;
+
+ E value;
+
+ public Entry(long prev, long next, E value) {
+ this.prev = prev;
+ this.next = next;
+ this.value = value;
+ }
+ }
+
+ private final class Iter implements ListIterator<E> {
+
+ private int expectedModCount = modCount;
+ private int index = 0;
+
+ private long prev = 0;
+ private long next = 0;
+
+ private byte lastOper = 0;
+
+ @Override
+ public boolean hasNext() {
+ return next != 0;
+ }
+
+ @Override
+ public E next() {
+ if (next == 0)
+ throw new NoSuchElementException();
+ checkForComodification();
+
+ Entry<E> e = fetch(next);
+
+ prev = next;
+ next = e.next;
+ index++;
+ lastOper = +1;
+ return e.value;
+ }
+
+ @Override
+ public boolean hasPrevious() {
+ return prev != 0;
+ }
+
+ @Override
+ public E previous() {
+ checkForComodification();
+ Entry<E> e = fetch(prev);
+ next = prev;
+ prev = e.prev;
+ index--;
+ lastOper = -1;
+ return e.value;
+ }
+
+ @Override
+ public int nextIndex() {
+ return index;
+ }
+
+ @Override
+ public int previousIndex() {
+ return index - 1;
+ }
+
+ @Override
+ public void remove() {
+ checkForComodification();
+ try {
+ if (lastOper == 1) {
+ // last operation was next() so remove previous element
+ lastOper = 0;
+
+ Entry<E> p = db.fetch(prev, entrySerializer);
+ // update entry before previous
+ if (p.prev != 0) {
+ Entry<E> pp = db.fetch(p.prev, entrySerializer);
+ pp.next = p.next;
+ db.update(p.prev, pp, entrySerializer);
+ }
+ // update entry after next
+ if (p.next != 0) {
+ Entry<E> pn = db.fetch(p.next, entrySerializer);
+ pn.prev = p.prev;
+ db.update(p.next, pn, entrySerializer);
+ }
+ // remove old record from db
+ db.delete(prev);
+ // update list
+ Root r = getRoot();
+ if (r.first == prev)
+ r.first = next;
+ if (r.last == prev)
+ r.last = next;
+ r.size--;
+ db.update(rootRecid, r, ROOT_SERIALIZER);
+ modCount++;
+ expectedModCount++;
+ // update iterator
+ prev = p.prev;
+
+ } else if (lastOper == -1) {
+ // last operation was prev() so remove next element
+ lastOper = 0;
+
+ Entry<E> n = db.fetch(next, entrySerializer);
+ // update entry before next
+ if (n.prev != 0) {
+ Entry<E> pp = db.fetch(n.prev, entrySerializer);
+ pp.next = n.next;
+ db.update(n.prev, pp, entrySerializer);
+ }
+ // update entry after previous
+ if (n.next != 0) {
+ Entry<E> pn = db.fetch(n.next, entrySerializer);
+ pn.prev = n.prev;
+ db.update(n.next, pn, entrySerializer);
+ }
+ // remove old record from db
+ db.delete(next);
+ // update list
+ Root r = getRoot();
+ if (r.last == next)
+ r.last = prev;
+ if (r.first == next)
+ r.first = prev;
+ r.size--;
+ db.update(rootRecid, r, ROOT_SERIALIZER);
+ modCount++;
+ expectedModCount++;
+ // update iterator
+ next = n.next;
+
+ } else
+ throw new IllegalStateException();
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ @Override
+ public void set(E value) {
+ checkForComodification();
+ try {
+ if (lastOper == 1) {
+ // last operation was next(), so update previous item
+ lastOper = 0;
+ Entry<E> n = db.fetch(prev, entrySerializer);
+ n.value = value;
+ db.update(prev, n, entrySerializer);
+ } else if (lastOper == -1) {
+ // last operation was prev() so update next item
+ lastOper = 0;
+ Entry<E> n = db.fetch(next, entrySerializer);
+ n.value = value;
+ db.update(next, n, entrySerializer);
+ } else
+ throw new IllegalStateException();
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ @Override
+ public void add(E value) {
+ checkForComodification();
+
+ // use more efficient method if possible
+ if (next == 0) {
+ LinkedList.this.add(value);
+ expectedModCount++;
+ return;
+ }
+ try {
+ // insert new entry
+ Entry<E> e = new Entry<E>(prev, next, value);
+ long recid = db.insert(e, entrySerializer, false);
+
+ // update previous entry
+ if (prev != 0) {
+ Entry<E> p = db.fetch(prev, entrySerializer);
+ if (p.next != next)
+ throw new Error();
+ p.next = recid;
+ db.update(prev, p, entrySerializer);
+ }
+
+ // update next entry
+ Entry<E> n = fetch(next);
+ if (n.prev != prev)
+ throw new Error();
+ n.prev = recid;
+ db.update(next, n, entrySerializer);
+
+ // update List
+ Root r = getRoot();
+ r.size++;
+ db.update(rootRecid, r, ROOT_SERIALIZER);
+
+ // update iterator
+ expectedModCount++;
+ modCount++;
+ prev = recid;
+
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+ final void checkForComodification() {
+ if (modCount != expectedModCount)
+ throw new ConcurrentModificationException();
+ }
+ }
+
+ /**
+ * Copyes collection from one db to other, while keeping logical recids
+ * unchanged
+ */
+ static void defrag(long recid, DBStore r1, DBStore r2) throws IOException {
+ try {
+ // move linked list itself
+ byte[] data = r1.fetchRaw(recid);
+ r2.forceInsert(recid, data);
+ DataInputOutput in = new DataInputOutput();
+ in.reset(data);
+ LinkedList l = (LinkedList) r1.defaultSerializer().deserialize(in);
+ l.loadValues = false;
+ // move linkedlist root
+ if (l.rootRecid == 0) // empty list, done
+ return;
+
+ data = r1.fetchRaw(l.rootRecid);
+ r2.forceInsert(l.rootRecid, data);
+ in.reset(data);
+ Root r = ROOT_SERIALIZER.deserialize(in);
+ // move all other nodes in linked list
+ long current = r.first;
+ while (current != 0) {
+ data = r1.fetchRaw(current);
+ in.reset(data);
+ r2.forceInsert(current, data);
+
+ Entry e = (Entry) l.entrySerializer.deserialize(in);
+ current = e.next;
+ }
+ } catch (ClassNotFoundException e) {
+ throw new IOError(e);
+ }
+
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class manages the linked lists of logical rowid pages.
+ */
+public final class LogicalRowIdManager {
+ // our record file and associated page manager
+ private final PageFile file;
+ private final PageManager pageman;
+ static final short ELEMS_PER_PAGE = (short) ((Storage.PAGE_SIZE - Magic.PAGE_HEADER_SIZE) / Magic.PhysicalRowId_SIZE);
+
+ private long[] freeRecordsInTransRowid = new long[4];
+ private int freeRecordsInTransSize = 0;
+
+ /** number of free logical rowids on logical free page, is SHORT */
+ static final int OFFSET_FREE_COUNT = Magic.PAGE_HEADER_SIZE;
+ static final int FREE_HEADER_SIZE = Magic.PAGE_HEADER_SIZE + Magic.SZ_SHORT;
+ /** maximal number of free logical per page */
+ static final int FREE_RECORDS_PER_PAGE = (Storage.PAGE_SIZE - FREE_HEADER_SIZE) / 6;
+
+ /**
+ * Creates a log rowid manager using the indicated record file and page
+ * manager
+ */
+ LogicalRowIdManager(PageFile file, PageManager pageman) throws IOException {
+ this.file = file;
+ this.pageman = pageman;
+ }
+
+ /**
+ * Creates a new logical rowid pointing to the indicated physical id
+ *
+ * @param physloc physical location to point to
+ * @return logical recid
+ */
+ long insert(final long physloc) throws IOException {
+ // check whether there's a free rowid to reuse
+ long retval = getFreeSlot();
+ if (retval == 0) {
+ // no. This means that we bootstrap things by allocating
+ // a new translation page and freeing all the rowids on it.
+ long firstPage = pageman.allocate(Magic.TRANSLATION_PAGE);
+ short curOffset = Magic.PAGE_HEADER_SIZE;
+ for (int i = 0; i < ELEMS_PER_PAGE; i++) {
+ putFreeSlot(((-firstPage) << Storage.PAGE_SIZE_SHIFT)
+ + (long) curOffset);
+
+ curOffset += Magic.PhysicalRowId_SIZE;
+ }
+
+ retval = getFreeSlot();
+ if (retval == 0) {
+ throw new Error("couldn't obtain free translation");
+ }
+ }
+ // write the translation.
+ update(retval, physloc);
+ return retval;
+ }
+
+ /**
+ * Insert at forced location, use only for defragmentation !!
+ *
+ * @param logicalRowId
+ * @param physLoc
+ * @throws IOException
+ */
+ void forceInsert(final long logicalRowId, final long physLoc)
+ throws IOException {
+ if (fetch(logicalRowId) != 0)
+ throw new Error("can not forceInsert, record already exists: "
+ + logicalRowId);
+
+ update(logicalRowId, physLoc);
+ }
+
+ /**
+ * Releases the indicated logical rowid.
+ */
+ void delete(final long logicalrowid) throws IOException {
+ // zero out old location, is needed for defragmentation
+ final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+ final PageIo xlatPage = file.get(pageId);
+ xlatPage.pageHeaderSetLocation(
+ (short) (logicalrowid & Storage.OFFSET_MASK), 0);
+ file.release(pageId, true);
+ putFreeSlot(logicalrowid);
+ }
+
+ /**
+ * Updates the mapping
+ *
+ * @param logicalrowid The logical rowid
+ * @param physloc The physical rowid
+ */
+ void update(final long logicalrowid, final long physloc) throws IOException {
+
+ final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+ final PageIo xlatPage = file.get(pageId);
+ xlatPage.pageHeaderSetLocation(
+ (short) (logicalrowid & Storage.OFFSET_MASK), physloc);
+ file.release(pageId, true);
+ }
+
+ /**
+ * Returns a mapping
+ *
+ * @param logicalrowid The logical rowid
+ * @return The physical rowid, 0 if does not exist
+ */
+ long fetch(long logicalrowid) throws IOException {
+ final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+ final long last = pageman.getLast(Magic.TRANSLATION_PAGE);
+ if (last - 1 > pageId)
+ return 0;
+
+ final short offset = (short) (logicalrowid & Storage.OFFSET_MASK);
+
+ final PageIo xlatPage = file.get(pageId);
+ final long ret = xlatPage.pageHeaderGetLocation(offset);
+
+ file.release(pageId, false);
+ return ret;
+ }
+
+ void commit() throws IOException {
+ if (freeRecordsInTransSize == 0)
+ return;
+
+ long freeRecPageId = pageman.getLast(Magic.FREELOGIDS_PAGE);
+ if (freeRecPageId == 0) {
+ // allocate new
+ freeRecPageId = pageman.allocate(Magic.FREELOGIDS_PAGE);
+ }
+ PageIo freeRecPage = file.get(freeRecPageId);
+ // write all uncommited free records
+ for (int rowPos = 0; rowPos < freeRecordsInTransSize; rowPos++) {
+ short count = freeRecPage.readShort(OFFSET_FREE_COUNT);
+ if (count == FREE_RECORDS_PER_PAGE) {
+ // allocate new free recid page
+ file.release(freeRecPage);
+ freeRecPageId = pageman.allocate(Magic.FREELOGIDS_PAGE);
+ freeRecPage = file.get(freeRecPageId);
+ freeRecPage.writeShort(FREE_RECORDS_PER_PAGE, (short) 0);
+ count = 0;
+ }
+ final int offset = (count) * 6 + FREE_HEADER_SIZE;
+ // write free recid and increase counter
+ freeRecPage.writeSixByteLong(offset, freeRecordsInTransRowid[rowPos]);
+ count++;
+ freeRecPage.writeShort(OFFSET_FREE_COUNT, count);
+
+ }
+ file.release(freeRecPage);
+
+ clearFreeRecidsInTransaction();
+ }
+
+ private void clearFreeRecidsInTransaction() {
+ if (freeRecordsInTransRowid.length > 128)
+ freeRecordsInTransRowid = new long[4];
+ freeRecordsInTransSize = 0;
+ }
+
+ void rollback() throws IOException {
+ clearFreeRecidsInTransaction();
+ }
+
+ /**
+ * Returns a free Logical rowid, or 0 if nothing was found.
+ */
+ long getFreeSlot() throws IOException {
+ if (freeRecordsInTransSize != 0) {
+ return freeRecordsInTransRowid[--freeRecordsInTransSize];
+ }
+
+ final long logicFreePageId = pageman.getLast(Magic.FREELOGIDS_PAGE);
+ if (logicFreePageId == 0) {
+ return 0;
+ }
+ PageIo logicFreePage = file.get(logicFreePageId);
+ short recCount = logicFreePage.readShort(OFFSET_FREE_COUNT);
+ if (recCount <= 0) {
+ throw new InternalError();
+ }
+
+ final int offset = (recCount - 1) * 6 + FREE_HEADER_SIZE;
+ final long ret = logicFreePage.readSixByteLong(offset);
+
+ recCount--;
+
+ if (recCount > 0) {
+ // decrease counter and zero out old record
+ logicFreePage.writeSixByteLong(offset, 0);
+ logicFreePage.writeShort(OFFSET_FREE_COUNT, recCount);
+ file.release(logicFreePage);
+ } else {
+ // release this page
+ file.release(logicFreePage);
+ pageman.free(Magic.FREELOGIDS_PAGE, logicFreePageId);
+ }
+
+ return ret;
+ }
+
+ /**
+ * Puts the indicated rowid on the free list
+ */
+ void putFreeSlot(long rowid) throws IOException {
+ // ensure capacity
+ if (freeRecordsInTransSize == freeRecordsInTransRowid.length)
+ freeRecordsInTransRowid = Arrays.copyOf(freeRecordsInTransRowid,
+ freeRecordsInTransRowid.length * 4);
+ // add record and increase size
+ freeRecordsInTransRowid[freeRecordsInTransSize] = rowid;
+ freeRecordsInTransSize++;
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Hash Map which uses primitive long as key. Main advantage is new instanceof
+ * of Long does not have to be created for each lookup.
+ * <p/>
+ * This code comes from Android, which in turns comes from Apache Harmony. This
+ * class was modified to use primitive longs and stripped down to consume less
+ * space.
+ * <p/>
+ * Author of JDBM modifications: Jan Kotek
+ */
+public final class LongHashMap<V> implements Serializable {
+ private static final long serialVersionUID = 362499999763181265L;
+
+ private int elementCount;
+
+ private Entry<V>[] elementData;
+
+ private final float loadFactor;
+
+ private int threshold;
+
+ private int defaultSize = 16;
+
+ private transient Entry<V> reuseAfterDelete = null;
+
+ static final class Entry<V> implements Serializable {
+ private static final long serialVersionUID = 362445231113181265L;
+
+ Entry<V> next;
+
+ V value;
+
+ long key;
+
+ Entry(long theKey) {
+ this.key = theKey;
+ this.value = null;
+ }
+
+ }
+
+ static class HashMapIterator<V> implements Iterator<V> {
+ private int position = 0;
+
+ boolean canRemove = false;
+
+ Entry<V> entry;
+
+ Entry<V> lastEntry;
+
+ final LongHashMap<V> associatedMap;
+
+ HashMapIterator(LongHashMap<V> hm) {
+ associatedMap = hm;
+ }
+
+ public boolean hasNext() {
+ if (entry != null) {
+ return true;
+ }
+
+ Entry<V>[] elementData = associatedMap.elementData;
+ int length = elementData.length;
+ int newPosition = position;
+ boolean result = false;
+
+ while (newPosition < length) {
+ if (elementData[newPosition] == null) {
+ newPosition++;
+ } else {
+ result = true;
+ break;
+ }
+ }
+
+ position = newPosition;
+ return result;
+ }
+
+ public V next() {
+
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Entry<V> result;
+ Entry<V> _entry = entry;
+ if (_entry == null) {
+ result = lastEntry = associatedMap.elementData[position++];
+ entry = lastEntry.next;
+ } else {
+ if (lastEntry.next != _entry) {
+ lastEntry = lastEntry.next;
+ }
+ result = _entry;
+ entry = _entry.next;
+ }
+ canRemove = true;
+ return result.value;
+ }
+
+ public void remove() {
+ if (!canRemove) {
+ throw new IllegalStateException();
+ }
+
+ canRemove = false;
+
+ if (lastEntry.next == entry) {
+ while (associatedMap.elementData[--position] == null) {
+ // Do nothing
+ }
+ associatedMap.elementData[position] = associatedMap.elementData[position].next;
+ entry = null;
+ } else {
+ lastEntry.next = entry;
+ }
+ if (lastEntry != null) {
+ Entry<V> reuse = lastEntry;
+ lastEntry = null;
+ reuse.key = Long.MIN_VALUE;
+ reuse.value = null;
+ associatedMap.reuseAfterDelete = reuse;
+ }
+
+ associatedMap.elementCount--;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Entry<V>[] newElementArray(int s) {
+ return new Entry[s];
+ }
+
+ /**
+ * Constructs a new empty {@code HashMap} instance.
+ *
+ * @since Android 1.0
+ */
+ public LongHashMap() {
+ this(16);
+ }
+
+ /**
+ * Constructs a new {@code HashMap} instance with the specified capacity.
+ *
+ * @param capacity the initial capacity of this hash map.
+ * @throws IllegalArgumentException when the capacity is less than zero.
+ * @since Android 1.0
+ */
+ public LongHashMap(int capacity) {
+ defaultSize = capacity;
+ if (capacity >= 0) {
+ elementCount = 0;
+ elementData = newElementArray(capacity == 0 ? 1 : capacity);
+ loadFactor = 0.75f; // Default load factor of 0.75
+ computeMaxSize();
+ } else {
+ throw new IllegalArgumentException();
+ }
+ }
+
+ // BEGIN android-changed
+
+ /**
+ * Removes all mappings from this hash map, leaving it empty.
+ *
+ * @see #isEmpty
+ * @see #size
+ * @since Android 1.0
+ */
+
+ public void clear() {
+ if (elementCount > 0) {
+ elementCount = 0;
+ }
+ if (elementData.length > 1024 && elementData.length > defaultSize)
+ elementData = new Entry[defaultSize];
+ else
+ Arrays.fill(elementData, null);
+ computeMaxSize();
+ }
+
+ // END android-changed
+
+ /**
+ * Returns a shallow copy of this map.
+ *
+ * @return a shallow copy of this map.
+ * @since Android 1.0
+ */
+
+ private void computeMaxSize() {
+ threshold = (int) (elementData.length * loadFactor);
+ }
+
+ /**
+ * Returns the value of the mapping with the specified key.
+ *
+ * @param key the key.
+ * @return the value of the mapping with the specified key, or {@code null} if
+ * no mapping for the specified key is found.
+ * @since Android 1.0
+ */
+
+ public V get(final long key) {
+
+ final int hash = powerHash(key);
+ final int index = (hash & 0x7FFFFFFF) % elementData.length;
+
+ // find non null entry
+ Entry<V> m = elementData[index];
+ while (m != null) {
+ if (key == m.key)
+ return m.value;
+ m = m.next;
+ }
+
+ return null;
+
+ }
+
+ /**
+ * Returns whether this map is empty.
+ *
+ * @return {@code true} if this map has no elements, {@code false} otherwise.
+ * @see #size()
+ * @since Android 1.0
+ */
+
+ public boolean isEmpty() {
+ return elementCount == 0;
+ }
+
+ /**
+ * @return iterator over keys
+ */
+
+ // public Iterator<K> keyIterator(){
+ // return new HashMapIterator<K, K, V>(
+ // new MapEntry.Type<K, K, V>() {
+ // public K get(Entry<K, V> entry) {
+ // return entry.key;
+ // }
+ // }, HashMap.this);
+ //
+ // }
+
+ /**
+ * Maps the specified key to the specified value.
+ *
+ * @param key the key.
+ * @param value the value.
+ * @return the value of any previous mapping with the specified key or
+ * {@code null} if there was no such mapping.
+ * @since Android 1.0
+ */
+
+ public V put(final long key, final V value) {
+
+ int hash = powerHash(key);
+ int index = (hash & 0x7FFFFFFF) % elementData.length;
+
+ // find non null entry
+ Entry<V> entry = elementData[index];
+ while (entry != null && key != entry.key) {
+ entry = entry.next;
+ }
+
+ if (entry == null) {
+ if (++elementCount > threshold) {
+ rehash();
+ index = (hash & 0x7FFFFFFF) % elementData.length;
+ }
+ entry = createHashedEntry(key, index);
+ }
+
+ V result = entry.value;
+ entry.value = value;
+ return result;
+ }
+
+ Entry<V> createHashedEntry(final long key, final int index) {
+ Entry<V> entry = reuseAfterDelete;
+ if (entry == null) {
+ entry = new Entry<V>(key);
+ } else {
+ reuseAfterDelete = null;
+ entry.key = key;
+ entry.value = null;
+ }
+
+ entry.next = elementData[index];
+ elementData[index] = entry;
+ return entry;
+ }
+
+ void rehash(final int capacity) {
+ int length = (capacity == 0 ? 1 : capacity << 1);
+
+ Entry<V>[] newData = newElementArray(length);
+ for (int i = 0; i < elementData.length; i++) {
+ Entry<V> entry = elementData[i];
+ while (entry != null) {
+ int index = ((int) powerHash(entry.key) & 0x7FFFFFFF) % length;
+ Entry<V> next = entry.next;
+ entry.next = newData[index];
+ newData[index] = entry;
+ entry = next;
+ }
+ }
+ elementData = newData;
+ computeMaxSize();
+ }
+
+ void rehash() {
+ rehash(elementData.length);
+ }
+
+ /**
+ * Removes the mapping with the specified key from this map.
+ *
+ * @param key the key of the mapping to remove.
+ * @return the value of the removed mapping or {@code null} if no mapping for
+ * the specified key was found.
+ * @since Android 1.0
+ */
+
+ public V remove(final long key) {
+ Entry<V> entry = removeEntry(key);
+ if (entry == null)
+ return null;
+ V ret = entry.value;
+ entry.value = null;
+ entry.key = Long.MIN_VALUE;
+ reuseAfterDelete = entry;
+
+ return ret;
+ }
+
+ Entry<V> removeEntry(final long key) {
+ Entry<V> last = null;
+
+ final int hash = powerHash(key);
+ final int index = (hash & 0x7FFFFFFF) % elementData.length;
+ Entry<V> entry = elementData[index];
+
+ while (true) {
+ if (entry == null) {
+ return null;
+ }
+
+ if (key == entry.key) {
+ if (last == null) {
+ elementData[index] = entry.next;
+ } else {
+ last.next = entry.next;
+ }
+ elementCount--;
+ return entry;
+ }
+
+ last = entry;
+ entry = entry.next;
+ }
+ }
+
+ /**
+ * Returns the number of elements in this map.
+ *
+ * @return the number of elements in this map.
+ * @since Android 1.0
+ */
+
+ public int size() {
+ return elementCount;
+ }
+
+ /**
+ * @returns iterator over values in map
+ */
+ public Iterator<V> valuesIterator() {
+ return new HashMapIterator<V>(this);
+
+ }
+
+ static final private int powerHash(final long key) {
+ int h = (int) (key ^ (key >>> 32));
+ h ^= (h >>> 20) ^ (h >>> 12);
+ return h ^ (h >>> 7) ^ (h >>> 4);
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Packing utility for non-negative <code>long</code> and values.
+ * <p/>
+ * Originally developed for Kryo by Nathan Sweet. Modified for JDBM by Jan Kotek
+ */
+public final class LongPacker {
+
+ /**
+ * Pack non-negative long into output stream. It will occupy 1-10 bytes
+ * depending on value (lower values occupy smaller space)
+ *
+ * @param os
+ * @param value
+ * @throws IOException
+ */
+ public static void packLong(DataOutput os, long value) throws IOException {
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ while ((value & ~0x7FL) != 0) {
+ os.write((((int) value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ os.write((byte) value);
+ }
+
+ /**
+ * Unpack positive long value from the input stream.
+ *
+ * @param is The input stream.
+ * @return The long value.
+ * @throws java.io.IOException
+ */
+ public static long unpackLong(DataInput is) throws IOException {
+
+ long result = 0;
+ for (int offset = 0; offset < 64; offset += 7) {
+ long b = is.readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed long.");
+ }
+
+ /**
+ * Pack non-negative long into output stream. It will occupy 1-5 bytes
+ * depending on value (lower values occupy smaller space)
+ *
+ * @param os
+ * @param value
+ * @throws IOException
+ */
+ public static void packInt(DataOutput os, int value) throws IOException {
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ while ((value & ~0x7F) != 0) {
+ os.write(((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+
+ os.write((byte) value);
+ }
+
+ public static int unpackInt(DataInput is) throws IOException {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = is.readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+/**
+ * This interface contains magic cookies.
+ */
+public interface Magic {
+ /**
+ * Magic cookie at start of file
+ */
+ short FILE_HEADER = 0x1350;
+
+ /**
+ * Magic for pages. They're offset by the page type magic codes.
+ */
+ short PAGE_MAGIC = 0x1351;
+
+ /**
+ * Magics for pages in certain lists.
+ */
+ short FREE_PAGE = 0;
+ short USED_PAGE = 1;
+ short TRANSLATION_PAGE = 2;
+ short FREELOGIDS_PAGE = 3;
+ short FREEPHYSIDS_PAGE = 4;
+ short FREEPHYSIDS_ROOT_PAGE = 5;
+
+ /**
+ * Number of lists in a file
+ */
+ short NLISTS = 6;
+
+ /**
+ * Magic for transaction file
+ */
+ short LOGFILE_HEADER = 0x1360;
+
+ /**
+ * Size of an externalized byte
+ */
+ short SZ_BYTE = 1;
+ /**
+ * Size of an externalized short
+ */
+ short SZ_SHORT = 2;
+
+ /**
+ * Size of an externalized int
+ */
+ short SZ_INT = 4;
+ /**
+ * Size of an externalized long
+ */
+ short SZ_LONG = 8;
+
+ /**
+ * size of three byte integer
+ */
+ short SZ_SIX_BYTE_LONG = 6;
+
+ /** offsets in file header (zero page in file) */
+ short FILE_HEADER_O_MAGIC = 0; // short magic
+ short FILE_HEADER_O_LISTS = Magic.SZ_SHORT; // long[2*NLISTS]
+ int FILE_HEADER_O_ROOTS = FILE_HEADER_O_LISTS
+ + (Magic.NLISTS * 2 * Magic.SZ_LONG);
+ /**
+ * The number of "root" rowids available in the file.
+ */
+ int FILE_HEADER_NROOTS = 16;
+
+ short PAGE_HEADER_O_MAGIC = 0; // short magic
+ short PAGE_HEADER_O_NEXT = Magic.SZ_SHORT;
+ short PAGE_HEADER_O_PREV = PAGE_HEADER_O_NEXT + Magic.SZ_SIX_BYTE_LONG;
+ short PAGE_HEADER_SIZE = PAGE_HEADER_O_PREV + Magic.SZ_SIX_BYTE_LONG;
+
+ short PhysicalRowId_O_LOCATION = 0; // long page
+ // short PhysicalRowId_O_OFFSET = Magic.SZ_SIX_BYTE_LONG; // short offset
+ int PhysicalRowId_SIZE = Magic.SZ_SIX_BYTE_LONG;
+
+ short DATA_PAGE_O_FIRST = PAGE_HEADER_SIZE; // short firstrowid
+ short DATA_PAGE_O_DATA = (short) (DATA_PAGE_O_FIRST + Magic.SZ_SHORT);
+ short DATA_PER_PAGE = (short) (Storage.PAGE_SIZE - DATA_PAGE_O_DATA);
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import javax.crypto.Cipher;
+
+/**
+ * This class represents a random access file as a set of fixed size records.
+ * Each record has a physical record number, and records are cached in order to
+ * improve access.
+ * <p/>
+ * The set of dirty records on the in-use list constitutes a transaction. Later
+ * on, we will send these records to some recovery thingy.
+ * <p/>
+ * PageFile is splited between more files, each with max size 1GB.
+ */
+public final class PageFile {
+ final PageTransactionManager txnMgr;
+
+ /**
+ * Pages currently locked for read/update ops. When released the page goes to
+ * the dirty or clean list, depending on a flag. The file header page is
+ * normally locked plus the page that is currently being read or modified.
+ *
+ * @see PageIo#isDirty()
+ */
+ private final LongHashMap<PageIo> inUse = new LongHashMap<PageIo>();
+
+ /**
+ * Pages whose state is dirty.
+ */
+ private final LongHashMap<PageIo> dirty = new LongHashMap<PageIo>();
+ /**
+ * Pages in a <em>historical</em> transaction(s) that have been written onto
+ * the log but which have not yet been committed to the database.
+ */
+ private final LongHashMap<PageIo> inTxn = new LongHashMap<PageIo>();
+
+ // transactions disabled?
+ final boolean transactionsDisabled;
+
+ /**
+ * A array of clean data to wipe clean pages.
+ */
+ static final byte[] CLEAN_DATA = new byte[Storage.PAGE_SIZE];
+
+ final Storage storage;
+ private Cipher cipherOut;
+ private Cipher cipherIn;
+
+ /**
+ * Creates a new object on the indicated filename. The file is opened in
+ * read/write mode.
+ *
+ * @param fileName the name of the file to open or create, without an
+ * extension.
+ * @throws IOException whenever the creation of the underlying
+ * RandomAccessFile throws it.
+ */
+ PageFile(String fileName, boolean readonly, boolean transactionsDisabled,
+ Cipher cipherIn, Cipher cipherOut, boolean useRandomAccessFile,
+ boolean lockingDisabled) throws IOException {
+ this.cipherIn = cipherIn;
+ this.cipherOut = cipherOut;
+ this.transactionsDisabled = transactionsDisabled;
+ if (fileName == null) {
+ this.storage = new StorageMemory(transactionsDisabled);
+ } else if (useRandomAccessFile) {
+ this.storage = new StorageDisk(fileName, readonly, lockingDisabled);
+ } else {
+ this.storage = new StorageDiskMapped(fileName, readonly,
+ transactionsDisabled, lockingDisabled);
+ }
+ if (this.storage.isReadonly() && !readonly)
+ throw new IllegalArgumentException(
+ "This type of storage is readonly, you should call readonly() on DBMaker");
+ if (!readonly && !transactionsDisabled) {
+ txnMgr = new PageTransactionManager(this, storage, cipherIn, cipherOut);
+ } else {
+ txnMgr = null;
+ }
+ }
+
+ public PageFile(String filename) throws IOException {
+ this(filename, false, false, null, null, false, false);
+ }
+
+ /**
+ * Gets a page from the file. The returned byte array is the in-memory copy of
+ * the record, and thus can be written (and subsequently released with a dirty
+ * flag in order to write the page back). If transactions are disabled,
+ * changes may be written directly
+ *
+ * @param pageId The record number to retrieve.
+ */
+ PageIo get(long pageId) throws IOException {
+
+ // try in transaction list, dirty list, free list
+ PageIo node = inTxn.get(pageId);
+ if (node != null) {
+ inTxn.remove(pageId);
+ inUse.put(pageId, node);
+ return node;
+ }
+ node = dirty.get(pageId);
+ if (node != null) {
+ dirty.remove(pageId);
+ inUse.put(pageId, node);
+ return node;
+ }
+
+ // sanity check: can't be on in use list
+ if (inUse.get(pageId) != null) {
+ throw new Error("double get for page " + pageId);
+ }
+
+ // read node from file
+ if (cipherOut == null) {
+ node = new PageIo(pageId, storage.read(pageId));
+ } else {
+ // decrypt if needed
+ ByteBuffer b = storage.read(pageId);
+ byte[] bb;
+ if (b.hasArray()) {
+ bb = b.array();
+ } else {
+ bb = new byte[Storage.PAGE_SIZE];
+ b.position(0);
+ b.get(bb, 0, Storage.PAGE_SIZE);
+ }
+ if (!JDBMUtils.allZeros(bb))
+ try {
+ bb = cipherOut.doFinal(bb);
+ node = new PageIo(pageId, ByteBuffer.wrap(bb));
+ } catch (Exception e) {
+ throw new IOError(e);
+ }
+ else {
+ node = new PageIo(pageId, ByteBuffer.wrap(PageFile.CLEAN_DATA)
+ .asReadOnlyBuffer());
+ }
+ }
+
+ inUse.put(pageId, node);
+ node.setClean();
+ return node;
+ }
+
+ /**
+ * Releases a page.
+ *
+ * @param pageId The record number to release.
+ * @param isDirty If true, the page was modified since the get().
+ */
+ void release(final long pageId, final boolean isDirty) throws IOException {
+
+ final PageIo page = inUse.remove(pageId);
+ if (!page.isDirty() && isDirty)
+ page.setDirty();
+
+ if (page.isDirty()) {
+ dirty.put(pageId, page);
+ } else if (!transactionsDisabled && page.isInTransaction()) {
+ inTxn.put(pageId, page);
+ }
+ }
+
+ /**
+ * Releases a page.
+ *
+ * @param page The page to release.
+ */
+ void release(final PageIo page) throws IOException {
+ final long key = page.getPageId();
+ inUse.remove(key);
+ if (page.isDirty()) {
+ // System.out.println( "Dirty: " + key + page );
+ dirty.put(key, page);
+ } else if (!transactionsDisabled && page.isInTransaction()) {
+ inTxn.put(key, page);
+ }
+ }
+
+ /**
+ * Discards a page (will not write the page even if it's dirty)
+ *
+ * @param page The page to discard.
+ */
+ void discard(PageIo page) {
+ long key = page.getPageId();
+ inUse.remove(key);
+ }
+
+ /**
+ * Commits the current transaction by flushing all dirty buffers to disk.
+ */
+ void commit() throws IOException {
+ // debugging...
+ if (!inUse.isEmpty() && inUse.size() > 1) {
+ showList(inUse.valuesIterator());
+ throw new Error("in use list not empty at commit time (" + inUse.size()
+ + ")");
+ }
+
+ // System.out.println("committing...");
+
+ if (dirty.size() == 0) {
+ // if no dirty pages, skip commit process
+ return;
+ }
+
+ if (!transactionsDisabled) {
+ txnMgr.start();
+ }
+
+ // sort pages by IDs
+ long[] pageIds = new long[dirty.size()];
+ int c = 0;
+ for (Iterator<PageIo> i = dirty.valuesIterator(); i.hasNext();) {
+ pageIds[c] = i.next().getPageId();
+ c++;
+ }
+ Arrays.sort(pageIds);
+
+ for (long pageId : pageIds) {
+ PageIo node = dirty.get(pageId);
+
+ // System.out.println("node " + node + " map size now " + dirty.size());
+ if (transactionsDisabled) {
+ if (cipherIn != null)
+ storage.write(node.getPageId(),
+ ByteBuffer.wrap(JDBMUtils.encrypt(cipherIn, node.getData())));
+ else
+ storage.write(node.getPageId(), node.getData());
+ node.setClean();
+ } else {
+ txnMgr.add(node);
+ inTxn.put(node.getPageId(), node);
+ }
+ }
+ dirty.clear();
+ if (!transactionsDisabled) {
+ txnMgr.commit();
+ }
+ }
+
+ /**
+ * Rollback the current transaction by discarding all dirty buffers
+ */
+ void rollback() throws IOException {
+ // debugging...
+ if (!inUse.isEmpty()) {
+ showList(inUse.valuesIterator());
+ throw new Error("in use list not empty at rollback time (" + inUse.size()
+ + ")");
+ }
+ // System.out.println("rollback...");
+ dirty.clear();
+
+ txnMgr.synchronizeLogFromDisk();
+
+ if (!inTxn.isEmpty()) {
+ showList(inTxn.valuesIterator());
+ throw new Error("in txn list not empty at rollback time (" + inTxn.size()
+ + ")");
+ }
+ ;
+ }
+
+ /**
+ * Commits and closes file.
+ */
+ void close() throws IOException {
+ if (!dirty.isEmpty()) {
+ commit();
+ }
+
+ if (!transactionsDisabled && txnMgr != null) {
+ txnMgr.shutdown();
+ }
+
+ if (!inTxn.isEmpty()) {
+ showList(inTxn.valuesIterator());
+ throw new Error("In transaction not empty");
+ }
+
+ // these actually ain't that bad in a production release
+ if (!dirty.isEmpty()) {
+ System.out.println("ERROR: dirty pages at close time");
+ showList(dirty.valuesIterator());
+ throw new Error("Dirty pages at close time");
+ }
+ if (!inUse.isEmpty()) {
+ System.out.println("ERROR: inUse pages at close time");
+ showList(inUse.valuesIterator());
+ throw new Error("inUse pages at close time");
+ }
+
+ storage.sync();
+ storage.forceClose();
+ }
+
+ /**
+ * Force closing the file and underlying transaction manager. Used for testing
+ * purposed only.
+ */
+ void forceClose() throws IOException {
+ if (!transactionsDisabled) {
+ txnMgr.forceClose();
+ }
+ storage.forceClose();
+ }
+
+ /**
+ * Prints contents of a list
+ */
+ private void showList(Iterator<PageIo> i) {
+ int cnt = 0;
+ while (i.hasNext()) {
+ System.out.println("elem " + cnt + ": " + i.next());
+ cnt++;
+ }
+ }
+
+ /**
+ * Synchs a node to disk. This is called by the transaction manager's
+ * synchronization code.
+ */
+ void synch(PageIo node) throws IOException {
+ ByteBuffer data = node.getData();
+ if (data != null) {
+ if (cipherIn != null)
+ storage.write(node.getPageId(),
+ ByteBuffer.wrap(JDBMUtils.encrypt(cipherIn, data)));
+ else
+ storage.write(node.getPageId(), data);
+ }
+ }
+
+ /**
+ * Releases a node from the transaction list, if it was sitting there.
+ */
+ void releaseFromTransaction(PageIo node) throws IOException {
+ inTxn.remove(node.getPageId());
+ }
+
+ /**
+ * Synchronizes the file.
+ */
+ void sync() throws IOException {
+ storage.sync();
+ }
+
+ public int getDirtyPageCount() {
+ return dirty.size();
+ }
+
+ public void deleteAllFiles() throws IOException {
+ storage.deleteAllFiles();
+ }
+}