You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/09/19 13:52:24 UTC

svn commit: r1387533 [4/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ * A bucket is a placeholder for multiple (key, value) pairs. Buckets are used
+ * to store collisions (same hash value) at all levels of an H*tree.
+ * <p/>
+ * There are two types of buckets: leaf and non-leaf.
+ * <p/>
+ * Non-leaf buckets are buckets which hold collisions which happen when the
+ * H*tree is not fully expanded. Keys in a non-leaf buckets can have different
+ * hash codes. Non-leaf buckets are limited to an arbitrary size. When this
+ * limit is reached, the H*tree should create a new HTreeDirectory node and
+ * distribute keys of the non-leaf buckets into the newly created
+ * HTreeDirectory.
+ * <p/>
+ * A leaf bucket is a bucket which contains keys which all have the same
+ * <code>hashCode()</code>. Leaf buckets stand at the bottom of an H*tree
+ * because the hashing algorithm cannot further discriminate between different
+ * keys based on their hash code.
+ */
+public final class HTreeBucket<K, V> {
+
+  /**
+   * The maximum number of elements (key, value) a non-leaf bucket can contain.
+   */
+  public static final int OVERFLOW_SIZE = 16;
+
+  /**
+   * Depth of this bucket.
+   */
+  private byte _depth;
+
+  /**
+   * Keys and values in this bucket. Keys are followed by values at
+   * KEYPOS+OVERFLOW_SIZE
+   */
+  private Object[] _keysAndValues;
+
+  private byte size = 0;
+
+  private final HTree<K, V> tree;
+
+  /**
+   * Public constructor for serialization.
+   */
+  public HTreeBucket(HTree<K, V> tree) {
+    this.tree = tree;
+  }
+
+  /**
+   * Construct a bucket with a given depth level. Depth level is the number of
+   * <code>HashDirectory</code> above this bucket.
+   */
+  public HTreeBucket(HTree<K, V> tree, byte level) {
+    this.tree = tree;
+    if (level > HTreeDirectory.MAX_DEPTH + 1) {
+      throw new IllegalArgumentException(
+          "Cannot create bucket with depth > MAX_DEPTH+1. " + "Depth=" + level);
+    }
+    _depth = level;
+    _keysAndValues = new Object[OVERFLOW_SIZE * 2];
+  }
+
+  /**
+   * Returns the number of elements contained in this bucket.
+   */
+  public int getElementCount() {
+    return size;
+  }
+
+  /**
+   * Returns whether or not this bucket is a "leaf bucket".
+   */
+  public boolean isLeaf() {
+    return (_depth > HTreeDirectory.MAX_DEPTH);
+  }
+
+  /**
+   * Returns true if bucket can accept at least one more element.
+   */
+  public boolean hasRoom() {
+    if (isLeaf()) {
+      return true; // leaf buckets are never full
+    } else {
+      // non-leaf bucket
+      return (size < OVERFLOW_SIZE);
+    }
+  }
+
+  /**
+   * Add an element (key, value) to this bucket. If an existing element has the
+   * same key, it is replaced silently.
+   * 
+   * @return Object which was previously associated with the given key or
+   *         <code>null</code> if no association existed.
+   */
+  public V addElement(K key, V value) {
+    // find entry
+    byte existing = -1;
+    for (byte i = 0; i < size; i++) {
+      if (key.equals(_keysAndValues[i])) {
+        existing = i;
+        break;
+      }
+    }
+
+    if (existing != -1) {
+      // replace existing element
+      Object before = _keysAndValues[existing + OVERFLOW_SIZE];
+      if (before instanceof BTreeLazyRecord) {
+        BTreeLazyRecord<V> rec = (BTreeLazyRecord<V>) before;
+        before = rec.get();
+        rec.delete();
+      }
+      _keysAndValues[existing + OVERFLOW_SIZE] = value;
+      return (V) before;
+    } else {
+      // add new (key, value) pair
+      _keysAndValues[size] = key;
+      _keysAndValues[size + OVERFLOW_SIZE] = value;
+      size++;
+      return null;
+    }
+  }
+
+  /**
+   * Remove an element, given a specific key.
+   * 
+   * @param key Key of the element to remove
+   * @return Removed element value, or <code>null</code> if not found
+   */
+  public V removeElement(K key) {
+    // find entry
+    byte existing = -1;
+    for (byte i = 0; i < size; i++) {
+      if (key.equals(_keysAndValues[i])) {
+        existing = i;
+        break;
+      }
+    }
+
+    if (existing != -1) {
+      Object o = _keysAndValues[existing + OVERFLOW_SIZE];
+      if (o instanceof BTreeLazyRecord) {
+        BTreeLazyRecord<V> rec = (BTreeLazyRecord<V>) o;
+        o = rec.get();
+        rec.delete();
+      }
+
+      // move last element to existing
+      size--;
+      _keysAndValues[existing] = _keysAndValues[size];
+      _keysAndValues[existing + OVERFLOW_SIZE] = _keysAndValues[size
+          + OVERFLOW_SIZE];
+
+      // and unset last element
+      _keysAndValues[size] = null;
+      _keysAndValues[size + OVERFLOW_SIZE] = null;
+
+      return (V) o;
+    } else {
+      // not found
+      return null;
+    }
+  }
+
+  /**
+   * Returns the value associated with a given key. If the given key is not
+   * found in this bucket, returns <code>null</code>.
+   */
+  public V getValue(K key) {
+    // find entry
+    byte existing = -1;
+    for (byte i = 0; i < size; i++) {
+      if (key.equals(_keysAndValues[i])) {
+        existing = i;
+        break;
+      }
+    }
+
+    if (existing != -1) {
+      Object o = _keysAndValues[existing + OVERFLOW_SIZE];
+      if (o instanceof BTreeLazyRecord)
+        return ((BTreeLazyRecord<V>) o).get();
+      else
+        return (V) o;
+    } else {
+      // key not found
+      return null;
+    }
+  }
+
+  /**
+   * Obtain keys contained in this buckets. Keys are ordered to match their
+   * values, which be be obtained by calling <code>getValues()</code>.
+   * <p/>
+   * As an optimization, the Vector returned is the instance member of this
+   * class. Please don't modify outside the scope of this class.
+   */
+  ArrayList<K> getKeys() {
+    ArrayList<K> ret = new ArrayList<K>();
+    for (byte i = 0; i < size; i++) {
+      ret.add((K) _keysAndValues[i]);
+    }
+    return ret;
+  }
+
+  /**
+   * Obtain values contained in this buckets. Values are ordered to match their
+   * keys, which be be obtained by calling <code>getKeys()</code>.
+   * <p/>
+   * As an optimization, the Vector returned is the instance member of this
+   * class. Please don't modify outside the scope of this class.
+   */
+  ArrayList<V> getValues() {
+    ArrayList<V> ret = new ArrayList<V>();
+    for (byte i = 0; i < size; i++) {
+      ret.add((V) _keysAndValues[i + OVERFLOW_SIZE]);
+    }
+    return ret;
+
+  }
+
+  public void writeExternal(DataOutput out) throws IOException {
+    out.write(_depth);
+    out.write(size);
+
+    DataInputOutput out3 = tree.writeBufferCache.getAndSet(null);
+    if (out3 == null)
+      out3 = new DataInputOutput();
+    else
+      out3.reset();
+
+    Serializer keySerializer = tree.keySerializer != null ? tree.keySerializer
+        : tree.getRecordManager().defaultSerializer();
+    for (byte i = 0; i < size; i++) {
+      out3.reset();
+      keySerializer.serialize(out3, _keysAndValues[i]);
+      LongPacker.packInt(out, out3.getPos());
+      out.write(out3.getBuf(), 0, out3.getPos());
+
+    }
+
+    // write values
+    if (tree.hasValues()) {
+      Serializer valSerializer = tree.valueSerializer != null ? tree.valueSerializer
+          : tree.getRecordManager().defaultSerializer();
+
+      for (byte i = 0; i < size; i++) {
+        Object value = _keysAndValues[i + OVERFLOW_SIZE];
+        if (value == null) {
+          out.write(BTreeLazyRecord.NULL);
+        } else if (value instanceof BTreeLazyRecord) {
+          out.write(BTreeLazyRecord.LAZY_RECORD);
+          LongPacker.packLong(out, ((BTreeLazyRecord) value).recid);
+        } else {
+          // transform to byte array
+          out3.reset();
+          valSerializer.serialize(out3, value);
+
+          if (out3.getPos() > BTreeLazyRecord.MAX_INTREE_RECORD_SIZE) {
+            // store as separate record
+            long recid = tree.getRecordManager().insert(out3.toByteArray(),
+                BTreeLazyRecord.FAKE_SERIALIZER, true);
+            out.write(BTreeLazyRecord.LAZY_RECORD);
+            LongPacker.packLong(out, recid);
+          } else {
+            out.write(out3.getPos());
+            out.write(out3.getBuf(), 0, out3.getPos());
+          }
+        }
+      }
+    }
+    tree.writeBufferCache.set(out3);
+
+  }
+
+  public void readExternal(DataInputOutput in) throws IOException,
+      ClassNotFoundException {
+    _depth = in.readByte();
+    size = in.readByte();
+
+    // read keys
+    Serializer keySerializer = tree.keySerializer != null ? tree.keySerializer
+        : tree.getRecordManager().defaultSerializer();
+    _keysAndValues = (K[]) new Object[OVERFLOW_SIZE * 2];
+    for (byte i = 0; i < size; i++) {
+      int expectedSize = LongPacker.unpackInt(in);
+      K key = (K) BTreeLazyRecord.fastDeser(in, keySerializer, expectedSize);
+      _keysAndValues[i] = key;
+    }
+
+    // read values
+    if (tree.hasValues()) {
+      Serializer<V> valSerializer = tree.valueSerializer != null ? tree.valueSerializer
+          : (Serializer<V>) tree.getRecordManager().defaultSerializer();
+      for (byte i = 0; i < size; i++) {
+        int header = in.readUnsignedByte();
+        if (header == BTreeLazyRecord.NULL) {
+          _keysAndValues[i + OVERFLOW_SIZE] = null;
+        } else if (header == BTreeLazyRecord.LAZY_RECORD) {
+          long recid = LongPacker.unpackLong(in);
+          _keysAndValues[i + OVERFLOW_SIZE] = (new BTreeLazyRecord(
+              tree.getRecordManager(), recid, valSerializer));
+        } else {
+          _keysAndValues[i + OVERFLOW_SIZE] = BTreeLazyRecord.fastDeser(in,
+              valSerializer, header);
+        }
+      }
+    } else {
+      for (byte i = 0; i < size; i++) {
+        if (_keysAndValues[i] != null)
+          _keysAndValues[i + OVERFLOW_SIZE] = JDBMUtils.EMPTY_STRING;
+      }
+    }
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,601 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Hashtable directory page.
+ */
+public final class HTreeDirectory<K, V> {
+
+  /**
+   * Maximum number of children in a directory.
+   * <p/>
+   * (Must be a power of 2 -- if you update this value, you must also update
+   * BIT_SIZE and MAX_DEPTH.)
+   * <p/>
+   * !!!! do not change this, it affects storage format, there are also magic
+   * numbers which relies on 255 !!!
+   */
+  static final int MAX_CHILDREN = 256;
+
+  /**
+   * Number of significant bits per directory level.
+   */
+  static final int BIT_SIZE = 8; // log2(256) = 8
+
+  /**
+   * Maximum number of levels (zero-based)
+   * <p/>
+   * (4 * 8 bits = 32 bits, which is the size of an "int", and as you know,
+   * hashcodes in Java are "ints")
+   */
+  static final int MAX_DEPTH = 3; // 4 levels
+
+  /**
+   * Record ids of children nodes. It is saved in matrix to save memory, some
+   * subarrays may be null.
+   */
+  private long[][] _children;
+
+  /**
+   * Depth of this directory page, zero-based
+   */
+  private byte _depth;
+
+  /**
+   * This directory's record ID in the DB. (transient)
+   */
+  private long _recid;
+
+  /** if this is root (depth=0), it contains size, otherwise -1 */
+  long size;
+
+  protected final HTree<K, V> tree;
+
+  /**
+   * Public constructor used by serialization
+   */
+  public HTreeDirectory(HTree<K, V> tree) {
+    this.tree = tree;
+  }
+
+  /**
+   * Construct a HashDirectory
+   * 
+   * @param depth Depth of this directory node.
+   */
+  HTreeDirectory(HTree<K, V> tree, byte depth) {
+    this.tree = tree;
+    _depth = depth;
+    _children = new long[32][];
+  }
+
+  /**
+   * Sets persistence context. This method must be called before any
+   * persistence-related operation.
+   * 
+   * @param recid Record id of this directory.
+   */
+  void setPersistenceContext(long recid) {
+    this._recid = recid;
+  }
+
+  /**
+   * Get the record identifier used to load this hashtable.
+   */
+  long getRecid() {
+    return _recid;
+  }
+
+  /**
+   * Returns whether or not this directory is empty. A directory is empty when
+   * it no longer contains buckets or sub-directories.
+   */
+  boolean isEmpty() {
+    for (int i = 0; i < _children.length; i++) {
+      long[] sub = _children[i];
+      if (sub != null) {
+        for (int j = 0; j < 8; j++) {
+          if (sub[j] != 0) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Returns the value which is associated with the given key. Returns
+   * <code>null</code> if there is not association for this key.
+   * 
+   * @param key key whose associated value is to be returned
+   */
+  V get(K key) throws IOException {
+    int hash = hashCode(key);
+    long child_recid = getRecid(hash);
+    if (child_recid == 0) {
+      // not bucket/node --> not found
+      return null;
+    } else {
+      Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+      // System.out.println("HashDirectory.get() child is : "+node);
+
+      if (node instanceof HTreeDirectory) {
+        // recurse into next directory level
+        HTreeDirectory<K, V> dir = (HTreeDirectory<K, V>) node;
+        dir.setPersistenceContext(child_recid);
+        return dir.get(key);
+      } else {
+        // node is a bucket
+        HTreeBucket<K, V> bucket = (HTreeBucket) node;
+        return bucket.getValue(key);
+      }
+    }
+  }
+
+  private long getRecid(int hash) {
+    long[] sub = _children[hash >>> 3];
+    return sub == null ? 0 : sub[hash % 8];
+  }
+
+  private void putRecid(int hash, long recid) {
+    long[] sub = _children[hash >>> 3];
+    if (sub == null) {
+      sub = new long[8];
+      _children[hash >>> 3] = sub;
+    }
+    sub[hash % 8] = recid;
+  }
+
+  /**
+   * Associates the specified value with the specified key.
+   * 
+   * @param key key with which the specified value is to be assocated.
+   * @param value value to be associated with the specified key.
+   * @return object which was previously associated with the given key, or
+   *         <code>null</code> if no association existed.
+   */
+  Object put(final Object key, final Object value) throws IOException {
+    if (value == null) {
+      return remove(key);
+    }
+    int hash = hashCode(key);
+    long child_recid = getRecid(hash);
+    if (child_recid == 0) {
+      // no bucket/node here yet, let's create a bucket
+      HTreeBucket bucket = new HTreeBucket(tree, (byte) (_depth + 1));
+
+      // insert (key,value) pair in bucket
+      Object existing = bucket.addElement(key, value);
+
+      long b_recid = tree.db.insert(bucket, tree.SERIALIZER, false);
+      putRecid(hash, b_recid);
+
+      tree.db.update(_recid, this, tree.SERIALIZER);
+
+      // System.out.println("Added: "+bucket);
+      return existing;
+    } else {
+      Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+
+      if (node instanceof HTreeDirectory) {
+        // recursive insert in next directory level
+        HTreeDirectory dir = (HTreeDirectory) node;
+        dir.setPersistenceContext(child_recid);
+        return dir.put(key, value);
+      } else {
+        // node is a bucket
+        HTreeBucket bucket = (HTreeBucket) node;
+        if (bucket.hasRoom()) {
+          Object existing = bucket.addElement(key, value);
+          tree.db.update(child_recid, bucket, tree.SERIALIZER);
+          // System.out.println("Added: "+bucket);
+          return existing;
+        } else {
+          // overflow, so create a new directory
+          if (_depth == MAX_DEPTH) {
+            throw new RuntimeException("Cannot create deeper directory. "
+                + "Depth=" + _depth);
+          }
+          HTreeDirectory dir = new HTreeDirectory(tree, (byte) (_depth + 1));
+          long dir_recid = tree.db.insert(dir, tree.SERIALIZER, false);
+          dir.setPersistenceContext(dir_recid);
+
+          putRecid(hash, dir_recid);
+          tree.db.update(_recid, this, tree.SERIALIZER);
+
+          // discard overflown bucket
+          tree.db.delete(child_recid);
+
+          // migrate existing bucket elements
+          ArrayList keys = bucket.getKeys();
+          ArrayList values = bucket.getValues();
+          int entries = keys.size();
+          for (int i = 0; i < entries; i++) {
+            dir.put(keys.get(i), values.get(i));
+          }
+
+          // (finally!) insert new element
+          return dir.put(key, value);
+        }
+      }
+    }
+  }
+
+  /**
+   * Remove the value which is associated with the given key. If the key does
+   * not exist, this method simply ignores the operation.
+   * 
+   * @param key key whose associated value is to be removed
+   * @return object which was associated with the given key, or
+   *         <code>null</code> if no association existed with given key.
+   */
+  Object remove(Object key) throws IOException {
+    int hash = hashCode(key);
+    long child_recid = getRecid(hash);
+    if (child_recid == 0) {
+      // not bucket/node --> not found
+      return null;
+    } else {
+      Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+      // System.out.println("HashDirectory.remove() child is : "+node);
+
+      if (node instanceof HTreeDirectory) {
+        // recurse into next directory level
+        HTreeDirectory dir = (HTreeDirectory) node;
+        dir.setPersistenceContext(child_recid);
+        Object existing = dir.remove(key);
+        if (existing != null) {
+          if (dir.isEmpty()) {
+            // delete empty directory
+            tree.db.delete(child_recid);
+            putRecid(hash, 0);
+            tree.db.update(_recid, this, tree.SERIALIZER);
+          }
+        }
+        return existing;
+      } else {
+        // node is a bucket
+        HTreeBucket bucket = (HTreeBucket) node;
+        Object existing = bucket.removeElement(key);
+        if (existing != null) {
+          if (bucket.getElementCount() >= 1) {
+            tree.db.update(child_recid, bucket, tree.SERIALIZER);
+          } else {
+            // delete bucket, it's empty
+            tree.db.delete(child_recid);
+            putRecid(hash, 0);
+            tree.db.update(_recid, this, tree.SERIALIZER);
+          }
+        }
+        return existing;
+      }
+    }
+  }
+
+  /**
+   * Calculates the hashcode of a key, based on the current directory depth.
+   */
+  private int hashCode(Object key) {
+    int hashMask = hashMask();
+    int hash = key.hashCode();
+    hash = hash & hashMask;
+    hash = hash >>> ((MAX_DEPTH - _depth) * BIT_SIZE);
+    hash = hash % MAX_CHILDREN;
+    /*
+     * System.out.println("HashDirectory.hashCode() is: 0x"
+     * +Integer.toHexString(hash) +" for object hashCode() 0x"
+     * +Integer.toHexString(key.hashCode()));
+     */
+    return hash;
+  }
+
+  /**
+   * Calculates the hashmask of this directory. The hashmask is the bit mask
+   * applied to a hashcode to retain only bits that are relevant to this
+   * directory level.
+   */
+  int hashMask() {
+    int bits = MAX_CHILDREN - 1;
+    int hashMask = bits << ((MAX_DEPTH - _depth) * BIT_SIZE);
+    /*
+     * System.out.println("HashDirectory.hashMask() is: 0x"
+     * +Integer.toHexString(hashMask));
+     */
+    return hashMask;
+  }
+
+  /**
+   * Returns an enumeration of the keys contained in this
+   */
+  Iterator<K> keys() throws IOException {
+    return new HDIterator(true);
+  }
+
+  /**
+   * Returns an enumeration of the values contained in this
+   */
+  Iterator<V> values() throws IOException {
+    return new HDIterator(false);
+  }
+
+  public void writeExternal(DataOutput out) throws IOException {
+    out.writeByte(_depth);
+    if (_depth == 0) {
+      LongPacker.packLong(out, size);
+    }
+
+    int zeroStart = 0;
+    for (int i = 0; i < MAX_CHILDREN; i++) {
+      if (getRecid(i) != 0) {
+        zeroStart = i;
+        break;
+      }
+    }
+
+    out.write(zeroStart);
+    if (zeroStart == MAX_CHILDREN)
+      return;
+
+    int zeroEnd = 0;
+    for (int i = MAX_CHILDREN - 1; i >= 0; i--) {
+      if (getRecid(i) != 0) {
+        zeroEnd = i;
+        break;
+      }
+    }
+    out.write(zeroEnd);
+
+    for (int i = zeroStart; i <= zeroEnd; i++) {
+      LongPacker.packLong(out, getRecid(i));
+    }
+  }
+
+  public void readExternal(DataInputOutput in) throws IOException,
+      ClassNotFoundException {
+    _depth = in.readByte();
+    if (_depth == 0)
+      size = LongPacker.unpackLong(in);
+    else
+      size = -1;
+
+    _children = new long[32][];
+    int zeroStart = in.readUnsignedByte();
+    int zeroEnd = in.readUnsignedByte();
+
+    for (int i = zeroStart; i <= zeroEnd; i++) {
+      long recid = LongPacker.unpackLong(in);
+      if (recid != 0)
+        putRecid(i, recid);
+    }
+
+  }
+
+  public void defrag(DBStore r1, DBStore r2) throws IOException,
+      ClassNotFoundException {
+    for (long[] sub : _children) {
+      if (sub == null)
+        continue;
+      for (long child : sub) {
+        if (child == 0)
+          continue;
+        byte[] data = r1.fetchRaw(child);
+        r2.forceInsert(child, data);
+        Object t = tree.SERIALIZER.deserialize(new DataInputOutput(data));
+        if (t instanceof HTreeDirectory) {
+          ((HTreeDirectory) t).defrag(r1, r2);
+        }
+      }
+    }
+  }
+
+  void deleteAllChildren() throws IOException {
+    for (long[] ll : _children) {
+      if (ll != null) {
+        for (long l : ll) {
+          if (l != 0) {
+            tree.db.delete(l);
+          }
+        }
+      }
+    }
+
+  }
+
+  // //////////////////////////////////////////////////////////////////////
+  // INNER CLASS
+  // //////////////////////////////////////////////////////////////////////
+
+  /**
+   * Utility class to enumerate keys/values in a HTree
+   */
+  class HDIterator<A> implements Iterator<A> {
+
+    /**
+     * True if we're iterating on keys, False if enumerating on values.
+     */
+    private boolean _iterateKeys;
+
+    /**
+     * Stacks of directories & last enumerated child position
+     */
+    private ArrayList _dirStack;
+    private ArrayList _childStack;
+
+    /**
+     * Current HashDirectory in the hierarchy
+     */
+    private HTreeDirectory _dir;
+
+    /**
+     * Current child position
+     */
+    private int _child;
+
+    /**
+     * Current bucket iterator
+     */
+    private Iterator<A> _iter;
+
+    private A next;
+
+    /**
+     * last item returned in next(), is used to remove() last item
+     */
+    private A last;
+
+    private int expectedModCount;
+
+    /**
+     * Construct an iterator on this directory.
+     * 
+     * @param iterateKeys True if iteration supplies keys, False if iterateKeys
+     *          supplies values.
+     */
+    HDIterator(boolean iterateKeys) throws IOException {
+      _dirStack = new ArrayList();
+      _childStack = new ArrayList();
+      _dir = HTreeDirectory.this;
+      _child = -1;
+      _iterateKeys = iterateKeys;
+      expectedModCount = tree.modCount;
+
+      prepareNext();
+      next = next2();
+
+    }
+
+    /**
+     * Returns the next object.
+     */
+    public A next2() {
+      A next = null;
+      if (_iter != null && _iter.hasNext()) {
+        next = _iter.next();
+      } else {
+        try {
+          prepareNext();
+        } catch (IOException except) {
+          throw new IOError(except);
+        }
+        if (_iter != null && _iter.hasNext()) {
+          return next2();
+        }
+      }
+      return next;
+    }
+
+    /**
+     * Prepare internal state so we can answer <code>hasMoreElements</code>
+     * <p/>
+     * Actually, this code prepares an Enumeration on the next Bucket to
+     * enumerate. If no following bucket is found, the next Enumeration is set
+     * to <code>null</code>.
+     */
+    private void prepareNext() throws IOException {
+      long child_recid = 0;
+
+      // get next bucket/directory to enumerate
+      do {
+        _child++;
+        if (_child >= MAX_CHILDREN) {
+
+          if (_dirStack.isEmpty()) {
+            // no more directory in the stack, we're finished
+            return;
+          }
+
+          // try next node
+          _dir = (HTreeDirectory) _dirStack.remove(_dirStack.size() - 1);
+          _child = ((Integer) _childStack.remove(_childStack.size() - 1))
+              .intValue();
+          continue;
+        }
+        child_recid = _dir.getRecid(_child);
+      } while (child_recid == 0);
+
+      if (child_recid == 0) {
+        throw new Error("child_recid cannot be 0");
+      }
+
+      Object node = tree.db.fetch(child_recid, tree.SERIALIZER);
+      // System.out.println("HDEnumeration.get() child is : "+node);
+
+      if (node instanceof HTreeDirectory) {
+        // save current position
+        _dirStack.add(_dir);
+        _childStack.add(new Integer(_child));
+
+        _dir = (HTreeDirectory) node;
+        _child = -1;
+
+        // recurse into
+        _dir.setPersistenceContext(child_recid);
+        prepareNext();
+      } else {
+        // node is a bucket
+        HTreeBucket bucket = (HTreeBucket) node;
+        if (_iterateKeys) {
+          ArrayList keys2 = bucket.getKeys();
+          _iter = keys2.iterator();
+        } else {
+          _iter = bucket.getValues().iterator();
+        }
+      }
+    }
+
+    public boolean hasNext() {
+      return next != null;
+    }
+
+    public A next() {
+      if (next == null)
+        throw new NoSuchElementException();
+      if (expectedModCount != tree.modCount)
+        throw new ConcurrentModificationException();
+      last = next;
+      next = next2();
+      return last;
+    }
+
+    public void remove() {
+      if (last == null)
+        throw new IllegalStateException();
+
+      if (expectedModCount != tree.modCount)
+        throw new ConcurrentModificationException();
+
+      // TODO current delete behaviour may change node layout. INVESTIGATE if
+      // this can happen!
+      tree.remove(last);
+      last = null;
+      expectedModCount++;
+    }
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.util.AbstractSet;
+import java.util.Iterator;
+
+/**
+ * Wrapper for HTree to implement java.util.Map interface
+ */
+public final class HTreeSet<E> extends AbstractSet<E> {
+
+  final HTree<E, Object> map;
+
+  HTreeSet(HTree<E, Object> map) {
+    this.map = map;
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    return map.keySet().iterator();
+  }
+
+  @Override
+  public int size() {
+    return map.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  @Override
+  public boolean contains(Object o) {
+    return map.containsKey(o);
+  }
+
+  @Override
+  public boolean add(E e) {
+    return map.put(e, JDBMUtils.EMPTY_STRING) == null;
+  }
+
+  @Override
+  public boolean remove(Object o) {
+    return map.remove(o) == JDBMUtils.EMPTY_STRING;
+  }
+
+  @Override
+  public void clear() {
+    map.clear();
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import javax.crypto.Cipher;
+
+/**
+ * Various utilities used in JDBM
+ */
+public final class JDBMUtils {
+
+  /**
+   * empty string is used as dummy value to represent null values in HashSet and
+   * TreeSet
+   */
+  static final String EMPTY_STRING = "";
+
+  public static byte[] encrypt(Cipher cipherIn, ByteBuffer b) {
+    if (cipherIn == null && b.hasArray())
+      return b.array();
+    byte[] bb = new byte[Storage.PAGE_SIZE];
+    b.rewind();
+    b.get(bb, 0, Storage.PAGE_SIZE);
+    return encrypt(cipherIn, bb);
+  }
+
+  public static byte[] encrypt(Cipher cipherIn, byte[] b) {
+    if (cipherIn == null)
+      return b;
+
+    try {
+      return cipherIn.doFinal(b);
+    } catch (Exception e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  /**
+   * Compares comparables. Default comparator for most of java types
+   */
+  static final Comparator COMPARABLE_COMPARATOR = new Comparator<Comparable>() {
+    public int compare(Comparable o1, Comparable o2) {
+      return o1 == null && o2 != null ? -1 : (o1 != null && o2 == null ? 1 : o1
+          .compareTo(o2));
+    }
+  };
+
+  static String formatSpaceUsage(long size) {
+    if (size < 1e4)
+      return size + "B";
+    else if (size < 1e7)
+      return "" + Math.round(1D * size / 1024D) + "KB";
+    else if (size < 1e10)
+      return "" + Math.round(1D * size / 1e6) + "MB";
+    else
+      return "" + Math.round(1D * size / 1e9) + "GB";
+  }
+
+  static boolean allZeros(byte[] b) {
+    for (int i = 0; i < b.length; i++) {
+      if (b[i] != 0)
+        return false;
+    }
+    return true;
+  }
+
+  static <E> E max(E e1, E e2, Comparator comp) {
+    if (e1 == null)
+      return e2;
+    if (e2 == null)
+      return e1;
+
+    if (comp == null)
+      comp = COMPARABLE_COMPARATOR;
+    return comp.compare(e1, e2) < 0 ? e2 : e1;
+  }
+
+  static <E> E min(E e1, E e2, Comparator comp) {
+    if (e1 == null)
+      return e2;
+    if (e2 == null)
+      return e1;
+
+    if (comp == null)
+      comp = COMPARABLE_COMPARATOR;
+
+    return comp.compare(e1, e2) > 0 ? e2 : e1;
+  }
+
+  static final Serializer<Object> NULL_SERIALIZER = new Serializer<Object>() {
+    public void serialize(DataOutput out, Object obj) throws IOException {
+      out.writeByte(11);
+    }
+
+    public Object deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+      in.readByte();
+      return null;
+    }
+  };
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,482 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.AbstractSequentialList;
+import java.util.ConcurrentModificationException;
+import java.util.Iterator;
+import java.util.ListIterator;
+import java.util.NoSuchElementException;
+
+/**
+ * LinkedList which stores its nodes on disk.
+ */
+public final class LinkedList<E> extends AbstractSequentialList<E> {
+
+  private DBAbstract db;
+
+  final long rootRecid;
+  /**
+   * size limit, is not currently used, but needs to be here for future
+   * compatibility. Zero means no limit.
+   */
+  long sizeLimit = 0;
+
+  static final class Root {
+    long first;
+    long last;
+    long size;
+  }
+
+  private static final Serializer<Root> ROOT_SERIALIZER = new Serializer<Root>() {
+
+    @Override
+    public void serialize(DataOutput out, Root obj) throws IOException {
+      LongPacker.packLong(out, obj.first);
+      LongPacker.packLong(out, obj.last);
+      LongPacker.packLong(out, obj.size);
+    }
+
+    @Override
+    public Root deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+      Root r = new Root();
+      r.first = LongPacker.unpackLong(in);
+      r.last = LongPacker.unpackLong(in);
+      r.size = LongPacker.unpackLong(in);
+      return r;
+    }
+  };
+
+  private Serializer<E> valueSerializer;
+
+  /**
+   * indicates that entry values should not be loaded during deserialization,
+   * used during defragmentation
+   */
+  protected boolean loadValues = true;
+
+  /** constructor used for deserialization */
+  LinkedList(DBAbstract db, long rootRecid, Serializer<E> valueSerializer) {
+    this.db = db;
+    this.rootRecid = rootRecid;
+    this.valueSerializer = valueSerializer;
+  }
+
+  /** constructor used to create new empty list */
+  LinkedList(DBAbstract db, Serializer<E> valueSerializer) throws IOException {
+    this.db = db;
+    if (valueSerializer != null && !(valueSerializer instanceof Serializable))
+      throw new IllegalArgumentException(
+          "Serializer does not implement Serializable");
+    this.valueSerializer = valueSerializer;
+    // create root
+    this.rootRecid = db.insert(new Root(), ROOT_SERIALIZER, false);
+  }
+
+  void setPersistenceContext(DBAbstract db) {
+    this.db = db;
+  }
+
+  @Override
+  public ListIterator<E> listIterator(int index) {
+    Root r = getRoot();
+    if (index < 0 || index > r.size)
+      throw new IndexOutOfBoundsException();
+
+    Iter iter = new Iter();
+    iter.next = r.first;
+
+    // scroll to requested position
+    // TODO scroll from end, if beyond half
+    for (int i = 0; i < index; i++) {
+      iter.next();
+    }
+    return iter;
+
+  }
+
+  Root getRoot() {
+    // expect that caller already holds lock
+    try {
+      return db.fetch(rootRecid, ROOT_SERIALIZER);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public int size() {
+    return (int) getRoot().size;
+  }
+
+  public Iterator<E> descendingIterator() {
+    return null; // To change body of implemented methods use File | Settings |
+                 // File Templates.
+  }
+
+  @Override
+  public boolean add(Object value) {
+    try {
+      Root r = getRoot();
+      Entry e = new Entry(r.last, 0, value);
+      long recid = db.insert(e, entrySerializer, false);
+
+      // update old last Entry to point to new record
+      if (r.last != 0) {
+        Entry oldLast = db.fetch(r.last, entrySerializer);
+        if (oldLast.next != 0)
+          throw new Error();
+        oldLast.next = recid;
+        db.update(r.last, oldLast, entrySerializer);
+      }
+
+      // update linked list
+      r.last = recid;
+      if (r.first == 0)
+        r.first = recid;
+      r.size++;
+      db.update(rootRecid, r, ROOT_SERIALIZER);
+      modCount++;
+      return true;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  private Entry<E> fetch(long recid) {
+    try {
+      return db.fetch(recid, entrySerializer);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  /**
+   * called from Serialization object
+   */
+  static LinkedList deserialize(DataInput is, Serialization ser)
+      throws IOException, ClassNotFoundException {
+    long rootrecid = LongPacker.unpackLong(is);
+    long sizeLimit = LongPacker.unpackLong(is);
+    if (sizeLimit != 0)
+      throw new InternalError(
+          "LinkedList.sizeLimit not supported in this JDBM version");
+    Serializer serializer = (Serializer) ser.deserialize(is);
+    return new LinkedList(ser.db, rootrecid, serializer);
+  }
+
+  void serialize(DataOutput out) throws IOException {
+    LongPacker.packLong(out, rootRecid);
+    LongPacker.packLong(out, sizeLimit);
+    db.defaultSerializer().serialize(out, valueSerializer);
+  }
+
+  private final Serializer<Entry> entrySerializer = new Serializer<Entry>() {
+
+    @Override
+    public void serialize(DataOutput out, Entry e) throws IOException {
+      LongPacker.packLong(out, e.prev);
+      LongPacker.packLong(out, e.next);
+      if (valueSerializer != null)
+        valueSerializer.serialize(out, (E) e.value);
+      else
+        db.defaultSerializer().serialize(out, e.value);
+    }
+
+    @Override
+    public Entry<E> deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+      long prev = LongPacker.unpackLong(in);
+      long next = LongPacker.unpackLong(in);
+      Object value = null;
+      if (loadValues)
+        value = valueSerializer == null ? db.defaultSerializer()
+            .deserialize(in) : valueSerializer.deserialize(in);
+      return new LinkedList.Entry(prev, next, value);
+    }
+  };
+
+  static class Entry<E> {
+    long prev = 0;
+    long next = 0;
+
+    E value;
+
+    public Entry(long prev, long next, E value) {
+      this.prev = prev;
+      this.next = next;
+      this.value = value;
+    }
+  }
+
+  private final class Iter implements ListIterator<E> {
+
+    private int expectedModCount = modCount;
+    private int index = 0;
+
+    private long prev = 0;
+    private long next = 0;
+
+    private byte lastOper = 0;
+
+    @Override
+    public boolean hasNext() {
+      return next != 0;
+    }
+
+    @Override
+    public E next() {
+      if (next == 0)
+        throw new NoSuchElementException();
+      checkForComodification();
+
+      Entry<E> e = fetch(next);
+
+      prev = next;
+      next = e.next;
+      index++;
+      lastOper = +1;
+      return e.value;
+    }
+
+    @Override
+    public boolean hasPrevious() {
+      return prev != 0;
+    }
+
+    @Override
+    public E previous() {
+      checkForComodification();
+      Entry<E> e = fetch(prev);
+      next = prev;
+      prev = e.prev;
+      index--;
+      lastOper = -1;
+      return e.value;
+    }
+
+    @Override
+    public int nextIndex() {
+      return index;
+    }
+
+    @Override
+    public int previousIndex() {
+      return index - 1;
+    }
+
+    @Override
+    public void remove() {
+      checkForComodification();
+      try {
+        if (lastOper == 1) {
+          // last operation was next() so remove previous element
+          lastOper = 0;
+
+          Entry<E> p = db.fetch(prev, entrySerializer);
+          // update entry before previous
+          if (p.prev != 0) {
+            Entry<E> pp = db.fetch(p.prev, entrySerializer);
+            pp.next = p.next;
+            db.update(p.prev, pp, entrySerializer);
+          }
+          // update entry after next
+          if (p.next != 0) {
+            Entry<E> pn = db.fetch(p.next, entrySerializer);
+            pn.prev = p.prev;
+            db.update(p.next, pn, entrySerializer);
+          }
+          // remove old record from db
+          db.delete(prev);
+          // update list
+          Root r = getRoot();
+          if (r.first == prev)
+            r.first = next;
+          if (r.last == prev)
+            r.last = next;
+          r.size--;
+          db.update(rootRecid, r, ROOT_SERIALIZER);
+          modCount++;
+          expectedModCount++;
+          // update iterator
+          prev = p.prev;
+
+        } else if (lastOper == -1) {
+          // last operation was prev() so remove next element
+          lastOper = 0;
+
+          Entry<E> n = db.fetch(next, entrySerializer);
+          // update entry before next
+          if (n.prev != 0) {
+            Entry<E> pp = db.fetch(n.prev, entrySerializer);
+            pp.next = n.next;
+            db.update(n.prev, pp, entrySerializer);
+          }
+          // update entry after previous
+          if (n.next != 0) {
+            Entry<E> pn = db.fetch(n.next, entrySerializer);
+            pn.prev = n.prev;
+            db.update(n.next, pn, entrySerializer);
+          }
+          // remove old record from db
+          db.delete(next);
+          // update list
+          Root r = getRoot();
+          if (r.last == next)
+            r.last = prev;
+          if (r.first == next)
+            r.first = prev;
+          r.size--;
+          db.update(rootRecid, r, ROOT_SERIALIZER);
+          modCount++;
+          expectedModCount++;
+          // update iterator
+          next = n.next;
+
+        } else
+          throw new IllegalStateException();
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+    @Override
+    public void set(E value) {
+      checkForComodification();
+      try {
+        if (lastOper == 1) {
+          // last operation was next(), so update previous item
+          lastOper = 0;
+          Entry<E> n = db.fetch(prev, entrySerializer);
+          n.value = value;
+          db.update(prev, n, entrySerializer);
+        } else if (lastOper == -1) {
+          // last operation was prev() so update next item
+          lastOper = 0;
+          Entry<E> n = db.fetch(next, entrySerializer);
+          n.value = value;
+          db.update(next, n, entrySerializer);
+        } else
+          throw new IllegalStateException();
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+    @Override
+    public void add(E value) {
+      checkForComodification();
+
+      // use more efficient method if possible
+      if (next == 0) {
+        LinkedList.this.add(value);
+        expectedModCount++;
+        return;
+      }
+      try {
+        // insert new entry
+        Entry<E> e = new Entry<E>(prev, next, value);
+        long recid = db.insert(e, entrySerializer, false);
+
+        // update previous entry
+        if (prev != 0) {
+          Entry<E> p = db.fetch(prev, entrySerializer);
+          if (p.next != next)
+            throw new Error();
+          p.next = recid;
+          db.update(prev, p, entrySerializer);
+        }
+
+        // update next entry
+        Entry<E> n = fetch(next);
+        if (n.prev != prev)
+          throw new Error();
+        n.prev = recid;
+        db.update(next, n, entrySerializer);
+
+        // update List
+        Root r = getRoot();
+        r.size++;
+        db.update(rootRecid, r, ROOT_SERIALIZER);
+
+        // update iterator
+        expectedModCount++;
+        modCount++;
+        prev = recid;
+
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+    final void checkForComodification() {
+      if (modCount != expectedModCount)
+        throw new ConcurrentModificationException();
+    }
+  }
+
+  /**
+   * Copyes collection from one db to other, while keeping logical recids
+   * unchanged
+   */
+  static void defrag(long recid, DBStore r1, DBStore r2) throws IOException {
+    try {
+      // move linked list itself
+      byte[] data = r1.fetchRaw(recid);
+      r2.forceInsert(recid, data);
+      DataInputOutput in = new DataInputOutput();
+      in.reset(data);
+      LinkedList l = (LinkedList) r1.defaultSerializer().deserialize(in);
+      l.loadValues = false;
+      // move linkedlist root
+      if (l.rootRecid == 0) // empty list, done
+        return;
+
+      data = r1.fetchRaw(l.rootRecid);
+      r2.forceInsert(l.rootRecid, data);
+      in.reset(data);
+      Root r = ROOT_SERIALIZER.deserialize(in);
+      // move all other nodes in linked list
+      long current = r.first;
+      while (current != 0) {
+        data = r1.fetchRaw(current);
+        in.reset(data);
+        r2.forceInsert(current, data);
+
+        Entry e = (Entry) l.entrySerializer.deserialize(in);
+        current = e.next;
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class manages the linked lists of logical rowid pages.
+ */
+public final class LogicalRowIdManager {
+  // our record file and associated page manager
+  private final PageFile file;
+  private final PageManager pageman;
+  static final short ELEMS_PER_PAGE = (short) ((Storage.PAGE_SIZE - Magic.PAGE_HEADER_SIZE) / Magic.PhysicalRowId_SIZE);
+
+  private long[] freeRecordsInTransRowid = new long[4];
+  private int freeRecordsInTransSize = 0;
+
+  /** number of free logical rowids on logical free page, is SHORT */
+  static final int OFFSET_FREE_COUNT = Magic.PAGE_HEADER_SIZE;
+  static final int FREE_HEADER_SIZE = Magic.PAGE_HEADER_SIZE + Magic.SZ_SHORT;
+  /** maximal number of free logical per page */
+  static final int FREE_RECORDS_PER_PAGE = (Storage.PAGE_SIZE - FREE_HEADER_SIZE) / 6;
+
+  /**
+   * Creates a log rowid manager using the indicated record file and page
+   * manager
+   */
+  LogicalRowIdManager(PageFile file, PageManager pageman) throws IOException {
+    this.file = file;
+    this.pageman = pageman;
+  }
+
+  /**
+   * Creates a new logical rowid pointing to the indicated physical id
+   * 
+   * @param physloc physical location to point to
+   * @return logical recid
+   */
+  long insert(final long physloc) throws IOException {
+    // check whether there's a free rowid to reuse
+    long retval = getFreeSlot();
+    if (retval == 0) {
+      // no. This means that we bootstrap things by allocating
+      // a new translation page and freeing all the rowids on it.
+      long firstPage = pageman.allocate(Magic.TRANSLATION_PAGE);
+      short curOffset = Magic.PAGE_HEADER_SIZE;
+      for (int i = 0; i < ELEMS_PER_PAGE; i++) {
+        putFreeSlot(((-firstPage) << Storage.PAGE_SIZE_SHIFT)
+            + (long) curOffset);
+
+        curOffset += Magic.PhysicalRowId_SIZE;
+      }
+
+      retval = getFreeSlot();
+      if (retval == 0) {
+        throw new Error("couldn't obtain free translation");
+      }
+    }
+    // write the translation.
+    update(retval, physloc);
+    return retval;
+  }
+
+  /**
+   * Insert at forced location, use only for defragmentation !!
+   * 
+   * @param logicalRowId
+   * @param physLoc
+   * @throws IOException
+   */
+  void forceInsert(final long logicalRowId, final long physLoc)
+      throws IOException {
+    if (fetch(logicalRowId) != 0)
+      throw new Error("can not forceInsert, record already exists: "
+          + logicalRowId);
+
+    update(logicalRowId, physLoc);
+  }
+
+  /**
+   * Releases the indicated logical rowid.
+   */
+  void delete(final long logicalrowid) throws IOException {
+    // zero out old location, is needed for defragmentation
+    final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+    final PageIo xlatPage = file.get(pageId);
+    xlatPage.pageHeaderSetLocation(
+        (short) (logicalrowid & Storage.OFFSET_MASK), 0);
+    file.release(pageId, true);
+    putFreeSlot(logicalrowid);
+  }
+
+  /**
+   * Updates the mapping
+   * 
+   * @param logicalrowid The logical rowid
+   * @param physloc The physical rowid
+   */
+  void update(final long logicalrowid, final long physloc) throws IOException {
+
+    final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+    final PageIo xlatPage = file.get(pageId);
+    xlatPage.pageHeaderSetLocation(
+        (short) (logicalrowid & Storage.OFFSET_MASK), physloc);
+    file.release(pageId, true);
+  }
+
+  /**
+   * Returns a mapping
+   * 
+   * @param logicalrowid The logical rowid
+   * @return The physical rowid, 0 if does not exist
+   */
+  long fetch(long logicalrowid) throws IOException {
+    final long pageId = -(logicalrowid >>> Storage.PAGE_SIZE_SHIFT);
+    final long last = pageman.getLast(Magic.TRANSLATION_PAGE);
+    if (last - 1 > pageId)
+      return 0;
+
+    final short offset = (short) (logicalrowid & Storage.OFFSET_MASK);
+
+    final PageIo xlatPage = file.get(pageId);
+    final long ret = xlatPage.pageHeaderGetLocation(offset);
+
+    file.release(pageId, false);
+    return ret;
+  }
+
+  void commit() throws IOException {
+    if (freeRecordsInTransSize == 0)
+      return;
+
+    long freeRecPageId = pageman.getLast(Magic.FREELOGIDS_PAGE);
+    if (freeRecPageId == 0) {
+      // allocate new
+      freeRecPageId = pageman.allocate(Magic.FREELOGIDS_PAGE);
+    }
+    PageIo freeRecPage = file.get(freeRecPageId);
+    // write all uncommited free records
+    for (int rowPos = 0; rowPos < freeRecordsInTransSize; rowPos++) {
+      short count = freeRecPage.readShort(OFFSET_FREE_COUNT);
+      if (count == FREE_RECORDS_PER_PAGE) {
+        // allocate new free recid page
+        file.release(freeRecPage);
+        freeRecPageId = pageman.allocate(Magic.FREELOGIDS_PAGE);
+        freeRecPage = file.get(freeRecPageId);
+        freeRecPage.writeShort(FREE_RECORDS_PER_PAGE, (short) 0);
+        count = 0;
+      }
+      final int offset = (count) * 6 + FREE_HEADER_SIZE;
+      // write free recid and increase counter
+      freeRecPage.writeSixByteLong(offset, freeRecordsInTransRowid[rowPos]);
+      count++;
+      freeRecPage.writeShort(OFFSET_FREE_COUNT, count);
+
+    }
+    file.release(freeRecPage);
+
+    clearFreeRecidsInTransaction();
+  }
+
+  private void clearFreeRecidsInTransaction() {
+    if (freeRecordsInTransRowid.length > 128)
+      freeRecordsInTransRowid = new long[4];
+    freeRecordsInTransSize = 0;
+  }
+
+  void rollback() throws IOException {
+    clearFreeRecidsInTransaction();
+  }
+
+  /**
+   * Returns a free Logical rowid, or 0 if nothing was found.
+   */
+  long getFreeSlot() throws IOException {
+    if (freeRecordsInTransSize != 0) {
+      return freeRecordsInTransRowid[--freeRecordsInTransSize];
+    }
+
+    final long logicFreePageId = pageman.getLast(Magic.FREELOGIDS_PAGE);
+    if (logicFreePageId == 0) {
+      return 0;
+    }
+    PageIo logicFreePage = file.get(logicFreePageId);
+    short recCount = logicFreePage.readShort(OFFSET_FREE_COUNT);
+    if (recCount <= 0) {
+      throw new InternalError();
+    }
+
+    final int offset = (recCount - 1) * 6 + FREE_HEADER_SIZE;
+    final long ret = logicFreePage.readSixByteLong(offset);
+
+    recCount--;
+
+    if (recCount > 0) {
+      // decrease counter and zero out old record
+      logicFreePage.writeSixByteLong(offset, 0);
+      logicFreePage.writeShort(OFFSET_FREE_COUNT, recCount);
+      file.release(logicFreePage);
+    } else {
+      // release this page
+      file.release(logicFreePage);
+      pageman.free(Magic.FREELOGIDS_PAGE, logicFreePageId);
+    }
+
+    return ret;
+  }
+
+  /**
+   * Puts the indicated rowid on the free list
+   */
+  void putFreeSlot(long rowid) throws IOException {
+    // ensure capacity
+    if (freeRecordsInTransSize == freeRecordsInTransRowid.length)
+      freeRecordsInTransRowid = Arrays.copyOf(freeRecordsInTransRowid,
+          freeRecordsInTransRowid.length * 4);
+    // add record and increase size
+    freeRecordsInTransRowid[freeRecordsInTransSize] = rowid;
+    freeRecordsInTransSize++;
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Hash Map which uses primitive long as key. Main advantage is new instanceof
+ * of Long does not have to be created for each lookup.
+ * <p/>
+ * This code comes from Android, which in turns comes from Apache Harmony. This
+ * class was modified to use primitive longs and stripped down to consume less
+ * space.
+ * <p/>
+ * Author of JDBM modifications: Jan Kotek
+ */
+public final class LongHashMap<V> implements Serializable {
+  private static final long serialVersionUID = 362499999763181265L;
+
+  private int elementCount;
+
+  private Entry<V>[] elementData;
+
+  private final float loadFactor;
+
+  private int threshold;
+
+  private int defaultSize = 16;
+
+  private transient Entry<V> reuseAfterDelete = null;
+
+  static final class Entry<V> implements Serializable {
+    private static final long serialVersionUID = 362445231113181265L;
+
+    Entry<V> next;
+
+    V value;
+
+    long key;
+
+    Entry(long theKey) {
+      this.key = theKey;
+      this.value = null;
+    }
+
+  }
+
+  static class HashMapIterator<V> implements Iterator<V> {
+    private int position = 0;
+
+    boolean canRemove = false;
+
+    Entry<V> entry;
+
+    Entry<V> lastEntry;
+
+    final LongHashMap<V> associatedMap;
+
+    HashMapIterator(LongHashMap<V> hm) {
+      associatedMap = hm;
+    }
+
+    public boolean hasNext() {
+      if (entry != null) {
+        return true;
+      }
+
+      Entry<V>[] elementData = associatedMap.elementData;
+      int length = elementData.length;
+      int newPosition = position;
+      boolean result = false;
+
+      while (newPosition < length) {
+        if (elementData[newPosition] == null) {
+          newPosition++;
+        } else {
+          result = true;
+          break;
+        }
+      }
+
+      position = newPosition;
+      return result;
+    }
+
+    public V next() {
+
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      Entry<V> result;
+      Entry<V> _entry = entry;
+      if (_entry == null) {
+        result = lastEntry = associatedMap.elementData[position++];
+        entry = lastEntry.next;
+      } else {
+        if (lastEntry.next != _entry) {
+          lastEntry = lastEntry.next;
+        }
+        result = _entry;
+        entry = _entry.next;
+      }
+      canRemove = true;
+      return result.value;
+    }
+
+    public void remove() {
+      if (!canRemove) {
+        throw new IllegalStateException();
+      }
+
+      canRemove = false;
+
+      if (lastEntry.next == entry) {
+        while (associatedMap.elementData[--position] == null) {
+          // Do nothing
+        }
+        associatedMap.elementData[position] = associatedMap.elementData[position].next;
+        entry = null;
+      } else {
+        lastEntry.next = entry;
+      }
+      if (lastEntry != null) {
+        Entry<V> reuse = lastEntry;
+        lastEntry = null;
+        reuse.key = Long.MIN_VALUE;
+        reuse.value = null;
+        associatedMap.reuseAfterDelete = reuse;
+      }
+
+      associatedMap.elementCount--;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Entry<V>[] newElementArray(int s) {
+    return new Entry[s];
+  }
+
+  /**
+   * Constructs a new empty {@code HashMap} instance.
+   * 
+   * @since Android 1.0
+   */
+  public LongHashMap() {
+    this(16);
+  }
+
+  /**
+   * Constructs a new {@code HashMap} instance with the specified capacity.
+   * 
+   * @param capacity the initial capacity of this hash map.
+   * @throws IllegalArgumentException when the capacity is less than zero.
+   * @since Android 1.0
+   */
+  public LongHashMap(int capacity) {
+    defaultSize = capacity;
+    if (capacity >= 0) {
+      elementCount = 0;
+      elementData = newElementArray(capacity == 0 ? 1 : capacity);
+      loadFactor = 0.75f; // Default load factor of 0.75
+      computeMaxSize();
+    } else {
+      throw new IllegalArgumentException();
+    }
+  }
+
+  // BEGIN android-changed
+
+  /**
+   * Removes all mappings from this hash map, leaving it empty.
+   * 
+   * @see #isEmpty
+   * @see #size
+   * @since Android 1.0
+   */
+
+  public void clear() {
+    if (elementCount > 0) {
+      elementCount = 0;
+    }
+    if (elementData.length > 1024 && elementData.length > defaultSize)
+      elementData = new Entry[defaultSize];
+    else
+      Arrays.fill(elementData, null);
+    computeMaxSize();
+  }
+
+  // END android-changed
+
+  /**
+   * Returns a shallow copy of this map.
+   * 
+   * @return a shallow copy of this map.
+   * @since Android 1.0
+   */
+
+  private void computeMaxSize() {
+    threshold = (int) (elementData.length * loadFactor);
+  }
+
+  /**
+   * Returns the value of the mapping with the specified key.
+   * 
+   * @param key the key.
+   * @return the value of the mapping with the specified key, or {@code null} if
+   *         no mapping for the specified key is found.
+   * @since Android 1.0
+   */
+
+  public V get(final long key) {
+
+    final int hash = powerHash(key);
+    final int index = (hash & 0x7FFFFFFF) % elementData.length;
+
+    // find non null entry
+    Entry<V> m = elementData[index];
+    while (m != null) {
+      if (key == m.key)
+        return m.value;
+      m = m.next;
+    }
+
+    return null;
+
+  }
+
+  /**
+   * Returns whether this map is empty.
+   * 
+   * @return {@code true} if this map has no elements, {@code false} otherwise.
+   * @see #size()
+   * @since Android 1.0
+   */
+
+  public boolean isEmpty() {
+    return elementCount == 0;
+  }
+
+  /**
+   * @return iterator over keys
+   */
+
+  // public Iterator<K> keyIterator(){
+  // return new HashMapIterator<K, K, V>(
+  // new MapEntry.Type<K, K, V>() {
+  // public K get(Entry<K, V> entry) {
+  // return entry.key;
+  // }
+  // }, HashMap.this);
+  //
+  // }
+
+  /**
+   * Maps the specified key to the specified value.
+   * 
+   * @param key the key.
+   * @param value the value.
+   * @return the value of any previous mapping with the specified key or
+   *         {@code null} if there was no such mapping.
+   * @since Android 1.0
+   */
+
+  public V put(final long key, final V value) {
+
+    int hash = powerHash(key);
+    int index = (hash & 0x7FFFFFFF) % elementData.length;
+
+    // find non null entry
+    Entry<V> entry = elementData[index];
+    while (entry != null && key != entry.key) {
+      entry = entry.next;
+    }
+
+    if (entry == null) {
+      if (++elementCount > threshold) {
+        rehash();
+        index = (hash & 0x7FFFFFFF) % elementData.length;
+      }
+      entry = createHashedEntry(key, index);
+    }
+
+    V result = entry.value;
+    entry.value = value;
+    return result;
+  }
+
+  Entry<V> createHashedEntry(final long key, final int index) {
+    Entry<V> entry = reuseAfterDelete;
+    if (entry == null) {
+      entry = new Entry<V>(key);
+    } else {
+      reuseAfterDelete = null;
+      entry.key = key;
+      entry.value = null;
+    }
+
+    entry.next = elementData[index];
+    elementData[index] = entry;
+    return entry;
+  }
+
+  void rehash(final int capacity) {
+    int length = (capacity == 0 ? 1 : capacity << 1);
+
+    Entry<V>[] newData = newElementArray(length);
+    for (int i = 0; i < elementData.length; i++) {
+      Entry<V> entry = elementData[i];
+      while (entry != null) {
+        int index = ((int) powerHash(entry.key) & 0x7FFFFFFF) % length;
+        Entry<V> next = entry.next;
+        entry.next = newData[index];
+        newData[index] = entry;
+        entry = next;
+      }
+    }
+    elementData = newData;
+    computeMaxSize();
+  }
+
+  void rehash() {
+    rehash(elementData.length);
+  }
+
+  /**
+   * Removes the mapping with the specified key from this map.
+   * 
+   * @param key the key of the mapping to remove.
+   * @return the value of the removed mapping or {@code null} if no mapping for
+   *         the specified key was found.
+   * @since Android 1.0
+   */
+
+  public V remove(final long key) {
+    Entry<V> entry = removeEntry(key);
+    if (entry == null)
+      return null;
+    V ret = entry.value;
+    entry.value = null;
+    entry.key = Long.MIN_VALUE;
+    reuseAfterDelete = entry;
+
+    return ret;
+  }
+
+  Entry<V> removeEntry(final long key) {
+    Entry<V> last = null;
+
+    final int hash = powerHash(key);
+    final int index = (hash & 0x7FFFFFFF) % elementData.length;
+    Entry<V> entry = elementData[index];
+
+    while (true) {
+      if (entry == null) {
+        return null;
+      }
+
+      if (key == entry.key) {
+        if (last == null) {
+          elementData[index] = entry.next;
+        } else {
+          last.next = entry.next;
+        }
+        elementCount--;
+        return entry;
+      }
+
+      last = entry;
+      entry = entry.next;
+    }
+  }
+
+  /**
+   * Returns the number of elements in this map.
+   * 
+   * @return the number of elements in this map.
+   * @since Android 1.0
+   */
+
+  public int size() {
+    return elementCount;
+  }
+
+  /**
+   * @returns iterator over values in map
+   */
+  public Iterator<V> valuesIterator() {
+    return new HashMapIterator<V>(this);
+
+  }
+
+  static final private int powerHash(final long key) {
+    int h = (int) (key ^ (key >>> 32));
+    h ^= (h >>> 20) ^ (h >>> 12);
+    return h ^ (h >>> 7) ^ (h >>> 4);
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Packing utility for non-negative <code>long</code> and values.
+ * <p/>
+ * Originally developed for Kryo by Nathan Sweet. Modified for JDBM by Jan Kotek
+ */
+public final class LongPacker {
+
+  /**
+   * Pack non-negative long into output stream. It will occupy 1-10 bytes
+   * depending on value (lower values occupy smaller space)
+   * 
+   * @param os
+   * @param value
+   * @throws IOException
+   */
+  public static void packLong(DataOutput os, long value) throws IOException {
+
+    if (value < 0) {
+      throw new IllegalArgumentException("negative value: v=" + value);
+    }
+
+    while ((value & ~0x7FL) != 0) {
+      os.write((((int) value & 0x7F) | 0x80));
+      value >>>= 7;
+    }
+    os.write((byte) value);
+  }
+
+  /**
+   * Unpack positive long value from the input stream.
+   * 
+   * @param is The input stream.
+   * @return The long value.
+   * @throws java.io.IOException
+   */
+  public static long unpackLong(DataInput is) throws IOException {
+
+    long result = 0;
+    for (int offset = 0; offset < 64; offset += 7) {
+      long b = is.readUnsignedByte();
+      result |= (b & 0x7F) << offset;
+      if ((b & 0x80) == 0) {
+        return result;
+      }
+    }
+    throw new Error("Malformed long.");
+  }
+
+  /**
+   * Pack non-negative long into output stream. It will occupy 1-5 bytes
+   * depending on value (lower values occupy smaller space)
+   * 
+   * @param os
+   * @param value
+   * @throws IOException
+   */
+  public static void packInt(DataOutput os, int value) throws IOException {
+
+    if (value < 0) {
+      throw new IllegalArgumentException("negative value: v=" + value);
+    }
+
+    while ((value & ~0x7F) != 0) {
+      os.write(((value & 0x7F) | 0x80));
+      value >>>= 7;
+    }
+
+    os.write((byte) value);
+  }
+
+  public static int unpackInt(DataInput is) throws IOException {
+    for (int offset = 0, result = 0; offset < 32; offset += 7) {
+      int b = is.readUnsignedByte();
+      result |= (b & 0x7F) << offset;
+      if ((b & 0x80) == 0) {
+        return result;
+      }
+    }
+    throw new Error("Malformed integer.");
+
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+/**
+ * This interface contains magic cookies.
+ */
+public interface Magic {
+  /**
+   * Magic cookie at start of file
+   */
+  short FILE_HEADER = 0x1350;
+
+  /**
+   * Magic for pages. They're offset by the page type magic codes.
+   */
+  short PAGE_MAGIC = 0x1351;
+
+  /**
+   * Magics for pages in certain lists.
+   */
+  short FREE_PAGE = 0;
+  short USED_PAGE = 1;
+  short TRANSLATION_PAGE = 2;
+  short FREELOGIDS_PAGE = 3;
+  short FREEPHYSIDS_PAGE = 4;
+  short FREEPHYSIDS_ROOT_PAGE = 5;
+
+  /**
+   * Number of lists in a file
+   */
+  short NLISTS = 6;
+
+  /**
+   * Magic for transaction file
+   */
+  short LOGFILE_HEADER = 0x1360;
+
+  /**
+   * Size of an externalized byte
+   */
+  short SZ_BYTE = 1;
+  /**
+   * Size of an externalized short
+   */
+  short SZ_SHORT = 2;
+
+  /**
+   * Size of an externalized int
+   */
+  short SZ_INT = 4;
+  /**
+   * Size of an externalized long
+   */
+  short SZ_LONG = 8;
+
+  /**
+   * size of three byte integer
+   */
+  short SZ_SIX_BYTE_LONG = 6;
+
+  /** offsets in file header (zero page in file) */
+  short FILE_HEADER_O_MAGIC = 0; // short magic
+  short FILE_HEADER_O_LISTS = Magic.SZ_SHORT; // long[2*NLISTS]
+  int FILE_HEADER_O_ROOTS = FILE_HEADER_O_LISTS
+      + (Magic.NLISTS * 2 * Magic.SZ_LONG);
+  /**
+   * The number of "root" rowids available in the file.
+   */
+  int FILE_HEADER_NROOTS = 16;
+
+  short PAGE_HEADER_O_MAGIC = 0; // short magic
+  short PAGE_HEADER_O_NEXT = Magic.SZ_SHORT;
+  short PAGE_HEADER_O_PREV = PAGE_HEADER_O_NEXT + Magic.SZ_SIX_BYTE_LONG;
+  short PAGE_HEADER_SIZE = PAGE_HEADER_O_PREV + Magic.SZ_SIX_BYTE_LONG;
+
+  short PhysicalRowId_O_LOCATION = 0; // long page
+  // short PhysicalRowId_O_OFFSET = Magic.SZ_SIX_BYTE_LONG; // short offset
+  int PhysicalRowId_SIZE = Magic.SZ_SIX_BYTE_LONG;
+
+  short DATA_PAGE_O_FIRST = PAGE_HEADER_SIZE; // short firstrowid
+  short DATA_PAGE_O_DATA = (short) (DATA_PAGE_O_FIRST + Magic.SZ_SHORT);
+  short DATA_PER_PAGE = (short) (Storage.PAGE_SIZE - DATA_PAGE_O_DATA);
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+
+import javax.crypto.Cipher;
+
+/**
+ * This class represents a random access file as a set of fixed size records.
+ * Each record has a physical record number, and records are cached in order to
+ * improve access.
+ * <p/>
+ * The set of dirty records on the in-use list constitutes a transaction. Later
+ * on, we will send these records to some recovery thingy.
+ * <p/>
+ * PageFile is splited between more files, each with max size 1GB.
+ */
+public final class PageFile {
+  final PageTransactionManager txnMgr;
+
+  /**
+   * Pages currently locked for read/update ops. When released the page goes to
+   * the dirty or clean list, depending on a flag. The file header page is
+   * normally locked plus the page that is currently being read or modified.
+   * 
+   * @see PageIo#isDirty()
+   */
+  private final LongHashMap<PageIo> inUse = new LongHashMap<PageIo>();
+
+  /**
+   * Pages whose state is dirty.
+   */
+  private final LongHashMap<PageIo> dirty = new LongHashMap<PageIo>();
+  /**
+   * Pages in a <em>historical</em> transaction(s) that have been written onto
+   * the log but which have not yet been committed to the database.
+   */
+  private final LongHashMap<PageIo> inTxn = new LongHashMap<PageIo>();
+
+  // transactions disabled?
+  final boolean transactionsDisabled;
+
+  /**
+   * A array of clean data to wipe clean pages.
+   */
+  static final byte[] CLEAN_DATA = new byte[Storage.PAGE_SIZE];
+
+  final Storage storage;
+  private Cipher cipherOut;
+  private Cipher cipherIn;
+
+  /**
+   * Creates a new object on the indicated filename. The file is opened in
+   * read/write mode.
+   * 
+   * @param fileName the name of the file to open or create, without an
+   *          extension.
+   * @throws IOException whenever the creation of the underlying
+   *           RandomAccessFile throws it.
+   */
+  PageFile(String fileName, boolean readonly, boolean transactionsDisabled,
+      Cipher cipherIn, Cipher cipherOut, boolean useRandomAccessFile,
+      boolean lockingDisabled) throws IOException {
+    this.cipherIn = cipherIn;
+    this.cipherOut = cipherOut;
+    this.transactionsDisabled = transactionsDisabled;
+    if (fileName == null) {
+      this.storage = new StorageMemory(transactionsDisabled);
+    } else if (useRandomAccessFile) {
+      this.storage = new StorageDisk(fileName, readonly, lockingDisabled);
+    } else {
+      this.storage = new StorageDiskMapped(fileName, readonly,
+          transactionsDisabled, lockingDisabled);
+    }
+    if (this.storage.isReadonly() && !readonly)
+      throw new IllegalArgumentException(
+          "This type of storage is readonly, you should call readonly() on DBMaker");
+    if (!readonly && !transactionsDisabled) {
+      txnMgr = new PageTransactionManager(this, storage, cipherIn, cipherOut);
+    } else {
+      txnMgr = null;
+    }
+  }
+
+  public PageFile(String filename) throws IOException {
+    this(filename, false, false, null, null, false, false);
+  }
+
+  /**
+   * Gets a page from the file. The returned byte array is the in-memory copy of
+   * the record, and thus can be written (and subsequently released with a dirty
+   * flag in order to write the page back). If transactions are disabled,
+   * changes may be written directly
+   * 
+   * @param pageId The record number to retrieve.
+   */
+  PageIo get(long pageId) throws IOException {
+
+    // try in transaction list, dirty list, free list
+    PageIo node = inTxn.get(pageId);
+    if (node != null) {
+      inTxn.remove(pageId);
+      inUse.put(pageId, node);
+      return node;
+    }
+    node = dirty.get(pageId);
+    if (node != null) {
+      dirty.remove(pageId);
+      inUse.put(pageId, node);
+      return node;
+    }
+
+    // sanity check: can't be on in use list
+    if (inUse.get(pageId) != null) {
+      throw new Error("double get for page " + pageId);
+    }
+
+    // read node from file
+    if (cipherOut == null) {
+      node = new PageIo(pageId, storage.read(pageId));
+    } else {
+      // decrypt if needed
+      ByteBuffer b = storage.read(pageId);
+      byte[] bb;
+      if (b.hasArray()) {
+        bb = b.array();
+      } else {
+        bb = new byte[Storage.PAGE_SIZE];
+        b.position(0);
+        b.get(bb, 0, Storage.PAGE_SIZE);
+      }
+      if (!JDBMUtils.allZeros(bb))
+        try {
+          bb = cipherOut.doFinal(bb);
+          node = new PageIo(pageId, ByteBuffer.wrap(bb));
+        } catch (Exception e) {
+          throw new IOError(e);
+        }
+      else {
+        node = new PageIo(pageId, ByteBuffer.wrap(PageFile.CLEAN_DATA)
+            .asReadOnlyBuffer());
+      }
+    }
+
+    inUse.put(pageId, node);
+    node.setClean();
+    return node;
+  }
+
+  /**
+   * Releases a page.
+   * 
+   * @param pageId The record number to release.
+   * @param isDirty If true, the page was modified since the get().
+   */
+  void release(final long pageId, final boolean isDirty) throws IOException {
+
+    final PageIo page = inUse.remove(pageId);
+    if (!page.isDirty() && isDirty)
+      page.setDirty();
+
+    if (page.isDirty()) {
+      dirty.put(pageId, page);
+    } else if (!transactionsDisabled && page.isInTransaction()) {
+      inTxn.put(pageId, page);
+    }
+  }
+
+  /**
+   * Releases a page.
+   * 
+   * @param page The page to release.
+   */
+  void release(final PageIo page) throws IOException {
+    final long key = page.getPageId();
+    inUse.remove(key);
+    if (page.isDirty()) {
+      // System.out.println( "Dirty: " + key + page );
+      dirty.put(key, page);
+    } else if (!transactionsDisabled && page.isInTransaction()) {
+      inTxn.put(key, page);
+    }
+  }
+
+  /**
+   * Discards a page (will not write the page even if it's dirty)
+   * 
+   * @param page The page to discard.
+   */
+  void discard(PageIo page) {
+    long key = page.getPageId();
+    inUse.remove(key);
+  }
+
+  /**
+   * Commits the current transaction by flushing all dirty buffers to disk.
+   */
+  void commit() throws IOException {
+    // debugging...
+    if (!inUse.isEmpty() && inUse.size() > 1) {
+      showList(inUse.valuesIterator());
+      throw new Error("in use list not empty at commit time (" + inUse.size()
+          + ")");
+    }
+
+    // System.out.println("committing...");
+
+    if (dirty.size() == 0) {
+      // if no dirty pages, skip commit process
+      return;
+    }
+
+    if (!transactionsDisabled) {
+      txnMgr.start();
+    }
+
+    // sort pages by IDs
+    long[] pageIds = new long[dirty.size()];
+    int c = 0;
+    for (Iterator<PageIo> i = dirty.valuesIterator(); i.hasNext();) {
+      pageIds[c] = i.next().getPageId();
+      c++;
+    }
+    Arrays.sort(pageIds);
+
+    for (long pageId : pageIds) {
+      PageIo node = dirty.get(pageId);
+
+      // System.out.println("node " + node + " map size now " + dirty.size());
+      if (transactionsDisabled) {
+        if (cipherIn != null)
+          storage.write(node.getPageId(),
+              ByteBuffer.wrap(JDBMUtils.encrypt(cipherIn, node.getData())));
+        else
+          storage.write(node.getPageId(), node.getData());
+        node.setClean();
+      } else {
+        txnMgr.add(node);
+        inTxn.put(node.getPageId(), node);
+      }
+    }
+    dirty.clear();
+    if (!transactionsDisabled) {
+      txnMgr.commit();
+    }
+  }
+
+  /**
+   * Rollback the current transaction by discarding all dirty buffers
+   */
+  void rollback() throws IOException {
+    // debugging...
+    if (!inUse.isEmpty()) {
+      showList(inUse.valuesIterator());
+      throw new Error("in use list not empty at rollback time (" + inUse.size()
+          + ")");
+    }
+    // System.out.println("rollback...");
+    dirty.clear();
+
+    txnMgr.synchronizeLogFromDisk();
+
+    if (!inTxn.isEmpty()) {
+      showList(inTxn.valuesIterator());
+      throw new Error("in txn list not empty at rollback time (" + inTxn.size()
+          + ")");
+    }
+    ;
+  }
+
+  /**
+   * Commits and closes file.
+   */
+  void close() throws IOException {
+    if (!dirty.isEmpty()) {
+      commit();
+    }
+
+    if (!transactionsDisabled && txnMgr != null) {
+      txnMgr.shutdown();
+    }
+
+    if (!inTxn.isEmpty()) {
+      showList(inTxn.valuesIterator());
+      throw new Error("In transaction not empty");
+    }
+
+    // these actually ain't that bad in a production release
+    if (!dirty.isEmpty()) {
+      System.out.println("ERROR: dirty pages at close time");
+      showList(dirty.valuesIterator());
+      throw new Error("Dirty pages at close time");
+    }
+    if (!inUse.isEmpty()) {
+      System.out.println("ERROR: inUse pages at close time");
+      showList(inUse.valuesIterator());
+      throw new Error("inUse pages  at close time");
+    }
+
+    storage.sync();
+    storage.forceClose();
+  }
+
+  /**
+   * Force closing the file and underlying transaction manager. Used for testing
+   * purposed only.
+   */
+  void forceClose() throws IOException {
+    if (!transactionsDisabled) {
+      txnMgr.forceClose();
+    }
+    storage.forceClose();
+  }
+
+  /**
+   * Prints contents of a list
+   */
+  private void showList(Iterator<PageIo> i) {
+    int cnt = 0;
+    while (i.hasNext()) {
+      System.out.println("elem " + cnt + ": " + i.next());
+      cnt++;
+    }
+  }
+
+  /**
+   * Synchs a node to disk. This is called by the transaction manager's
+   * synchronization code.
+   */
+  void synch(PageIo node) throws IOException {
+    ByteBuffer data = node.getData();
+    if (data != null) {
+      if (cipherIn != null)
+        storage.write(node.getPageId(),
+            ByteBuffer.wrap(JDBMUtils.encrypt(cipherIn, data)));
+      else
+        storage.write(node.getPageId(), data);
+    }
+  }
+
+  /**
+   * Releases a node from the transaction list, if it was sitting there.
+   */
+  void releaseFromTransaction(PageIo node) throws IOException {
+    inTxn.remove(node.getPageId());
+  }
+
+  /**
+   * Synchronizes the file.
+   */
+  void sync() throws IOException {
+    storage.sync();
+  }
+
+  public int getDirtyPageCount() {
+    return dirty.size();
+  }
+
+  public void deleteAllFiles() throws IOException {
+    storage.deleteAllFiles();
+  }
+}