You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/09/19 13:52:24 UTC

svn commit: r1387533 [6/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,1362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.Vector;
+
+/**
+ * Serialization util. It reduces serialized data size for most common java
+ * types.
+ * <p/>
+ * Common pattern is one byte header which identifies data type, then size is
+ * written (if required) and data.
+ * <p/>
+ * On unknown types normal java serialization is used
+ * <p/>
+ * <p/>
+ * Header byte values bellow 180 are reserved by author for future use. If you
+ * want to customize this class, use values over 180, to be compatible with
+ * future updates.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public final class Serialization extends SerialClassInfo implements Serializer, SerializationHeader {
+
+  /**
+   * print statistics to STDOUT
+   */
+  static final boolean DEBUG = false;
+
+  static final String UTF8 = "UTF-8";
+
+  Serialization(DBAbstract db, long serialClassInfoRecid,
+      ArrayList<ClassInfo> info) throws IOException {
+    super(db, serialClassInfoRecid, info);
+  }
+
+  public Serialization() {
+    super(null, 0L, new ArrayList<ClassInfo>());
+    // Add java.lang.Object as registered class
+    registered.add(new ClassInfo(Object.class.getName(), new FieldInfo[] {},
+        false, false));
+  }
+
+  /**
+   * Serialize the object into a byte array.
+   */
+  public byte[] serialize(Object obj) throws IOException {
+    DataInputOutput ba = new DataInputOutput();
+
+    serialize(ba, obj);
+
+    return ba.toByteArray();
+  }
+
+  boolean isSerializable(Object obj) {
+    // TODO suboptimal code
+    try {
+      serialize(new DataOutputStream(new ByteArrayOutputStream()), obj);
+      return true;
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public void serialize(final DataOutput out, final Object obj)
+      throws IOException {
+    serialize(out, obj, null);
+  }
+
+  public void serialize(final DataOutput out, final Object obj,
+      FastArrayList objectStack) throws IOException {
+
+    /** try to find object on stack if it exists */
+    if (objectStack != null) {
+      int indexInObjectStack = objectStack.identityIndexOf(obj);
+      if (indexInObjectStack != -1) {
+        // object was already serialized, just write reference to it and return
+        out.write(OBJECT_STACK);
+        LongPacker.packInt(out, indexInObjectStack);
+        return;
+      }
+      // add this object to objectStack
+      objectStack.add(obj);
+    }
+
+    final Class clazz = obj != null ? obj.getClass() : null;
+
+    /** first try to serialize object without initializing object stack */
+    if (obj == null) {
+      out.write(NULL);
+      return;
+    } else if (clazz == Boolean.class) {
+      if (((Boolean) obj).booleanValue())
+        out.write(BOOLEAN_TRUE);
+      else
+        out.write(BOOLEAN_FALSE);
+      return;
+    } else if (clazz == Integer.class) {
+      final int val = (Integer) obj;
+      writeInteger(out, val);
+      return;
+    } else if (clazz == Double.class) {
+      double v = (Double) obj;
+      if (v == -1d)
+        out.write(DOUBLE_MINUS_1);
+      else if (v == 0d)
+        out.write(DOUBLE_0);
+      else if (v == 1d)
+        out.write(DOUBLE_1);
+      else if (v >= 0 && v <= 255 && (int) v == v) {
+        out.write(DOUBLE_255);
+        out.write((int) v);
+      } else if (v >= Short.MIN_VALUE && v <= Short.MAX_VALUE && (short) v == v) {
+        out.write(DOUBLE_SHORT);
+        out.writeShort((int) v);
+      } else {
+        out.write(DOUBLE_FULL);
+        out.writeDouble(v);
+      }
+      return;
+    } else if (clazz == Float.class) {
+      float v = (Float) obj;
+      if (v == -1f)
+        out.write(FLOAT_MINUS_1);
+      else if (v == 0f)
+        out.write(FLOAT_0);
+      else if (v == 1f)
+        out.write(FLOAT_1);
+      else if (v >= 0 && v <= 255 && (int) v == v) {
+        out.write(FLOAT_255);
+        out.write((int) v);
+      } else if (v >= Short.MIN_VALUE && v <= Short.MAX_VALUE && (short) v == v) {
+        out.write(FLOAT_SHORT);
+        out.writeShort((int) v);
+
+      } else {
+        out.write(FLOAT_FULL);
+        out.writeFloat(v);
+      }
+      return;
+    } else if (clazz == BigInteger.class) {
+      out.write(BIGINTEGER);
+      byte[] buf = ((BigInteger) obj).toByteArray();
+      serializeByteArrayInt(out, buf);
+      return;
+    } else if (clazz == BigDecimal.class) {
+      out.write(BIGDECIMAL);
+      BigDecimal d = (BigDecimal) obj;
+      serializeByteArrayInt(out, d.unscaledValue().toByteArray());
+      LongPacker.packInt(out, d.scale());
+      return;
+    } else if (clazz == Long.class) {
+      final long val = (Long) obj;
+      writeLong(out, val);
+      return;
+    } else if (clazz == Short.class) {
+      short val = (Short) obj;
+      if (val == -1)
+        out.write(SHORT_MINUS_1);
+      else if (val == 0)
+        out.write(SHORT_0);
+      else if (val == 1)
+        out.write(SHORT_1);
+      else if (val > 0 && val < 255) {
+        out.write(SHORT_255);
+        out.write(val);
+      } else {
+        out.write(SHORT_FULL);
+        out.writeShort(val);
+      }
+      return;
+    } else if (clazz == Byte.class) {
+      byte val = (Byte) obj;
+      if (val == -1)
+        out.write(BYTE_MINUS_1);
+      else if (val == 0)
+        out.write(BYTE_0);
+      else if (val == 1)
+        out.write(BYTE_1);
+      else {
+        out.write(SHORT_FULL);
+        out.writeByte(val);
+      }
+      return;
+    } else if (clazz == Character.class) {
+      out.write(CHAR);
+      out.writeChar((Character) obj);
+      return;
+    } else if (clazz == String.class) {
+      String s = (String) obj;
+      if (s.length() == 0) {
+        out.write(STRING_EMPTY);
+      } else {
+        out.write(STRING);
+        serializeString(out, s);
+      }
+      return;
+    } else if (obj instanceof Class) {
+      out.write(CLASS);
+      serialize(out, ((Class) obj).getName());
+      return;
+    } else if (obj instanceof int[]) {
+      writeIntArray(out, (int[]) obj);
+      return;
+    } else if (obj instanceof long[]) {
+      writeLongArray(out, (long[]) obj);
+      return;
+    } else if (obj instanceof short[]) {
+      out.write(SHORT_ARRAY);
+      short[] a = (short[]) obj;
+      LongPacker.packInt(out, a.length);
+      for (short s : a)
+        out.writeShort(s);
+      return;
+    } else if (obj instanceof boolean[]) {
+      out.write(BOOLEAN_ARRAY);
+      boolean[] a = (boolean[]) obj;
+      LongPacker.packInt(out, a.length);
+      for (boolean s : a)
+        out.writeBoolean(s); // TODO pack 8 booleans to single byte
+      return;
+    } else if (obj instanceof double[]) {
+      out.write(DOUBLE_ARRAY);
+      double[] a = (double[]) obj;
+      LongPacker.packInt(out, a.length);
+      for (double s : a)
+        out.writeDouble(s);
+      return;
+    } else if (obj instanceof float[]) {
+      out.write(FLOAT_ARRAY);
+      float[] a = (float[]) obj;
+      LongPacker.packInt(out, a.length);
+      for (float s : a)
+        out.writeFloat(s);
+      return;
+    } else if (obj instanceof char[]) {
+      out.write(CHAR_ARRAY);
+      char[] a = (char[]) obj;
+      LongPacker.packInt(out, a.length);
+      for (char s : a)
+        out.writeChar(s);
+      return;
+    } else if (obj instanceof byte[]) {
+      byte[] b = (byte[]) obj;
+      out.write(ARRAY_BYTE_INT);
+      serializeByteArrayInt(out, b);
+      return;
+    } else if (clazz == Date.class) {
+      out.write(DATE);
+      out.writeLong(((Date) obj).getTime());
+      return;
+    } else if (clazz == UUID.class) {
+      out.write(UUID);
+      serializeUUID(out, (UUID) obj);
+      return;
+    } else if (clazz == BTree.class) {
+      out.write(BTREE);
+      ((BTree) obj).writeExternal(out);
+      return;
+    } else if (clazz == HTree.class) {
+      out.write(HTREE);
+      ((HTree) obj).serialize(out);
+      return;
+    } else if (clazz == LinkedList.class) {
+      out.write(JDBMLINKEDLIST);
+      ((LinkedList) obj).serialize(out);
+      return;
+    }
+
+    /**
+     * classes bellow need object stack, so initialize it if not alredy
+     * initialized
+     */
+    if (objectStack == null) {
+      objectStack = new FastArrayList();
+      objectStack.add(obj);
+    }
+
+    if (obj instanceof Object[]) {
+      Object[] b = (Object[]) obj;
+      boolean packableLongs = b.length <= 255;
+      if (packableLongs) {
+        // check if it contains packable longs
+        for (Object o : b) {
+          if (o != null
+              && (o.getClass() != Long.class || (((Long) o).longValue() < 0 && ((Long) o)
+                  .longValue() != Long.MAX_VALUE))) {
+            packableLongs = false;
+            break;
+          }
+        }
+      }
+
+      if (packableLongs) {
+        // packable Longs is special case, it is often used in JDBM to reference
+        // fields
+        out.write(ARRAY_OBJECT_PACKED_LONG);
+        out.write(b.length);
+        for (Object o : b) {
+          if (o == null)
+            LongPacker.packLong(out, 0);
+          else
+            LongPacker.packLong(out, ((Long) o).longValue() + 1);
+        }
+
+      } else {
+        out.write(ARRAY_OBJECT);
+        LongPacker.packInt(out, b.length);
+
+        // Write class id for components
+        Class<?> componentType = obj.getClass().getComponentType();
+        registerClass(componentType);
+        // write class header
+        int classId = getClassId(componentType);
+        LongPacker.packInt(out, classId);
+
+        for (Object o : b)
+          serialize(out, o, objectStack);
+
+      }
+
+    } else if (clazz == ArrayList.class) {
+      ArrayList l = (ArrayList) obj;
+      boolean packableLongs = l.size() < 255;
+      if (packableLongs) {
+        // packable Longs is special case, it is often used in JDBM to reference
+        // fields
+        for (Object o : l) {
+          if (o != null
+              && (o.getClass() != Long.class || (((Long) o).longValue() < 0 && ((Long) o)
+                  .longValue() != Long.MAX_VALUE))) {
+            packableLongs = false;
+            break;
+          }
+        }
+      }
+      if (packableLongs) {
+        out.write(ARRAYLIST_PACKED_LONG);
+        out.write(l.size());
+        for (Object o : l) {
+          if (o == null)
+            LongPacker.packLong(out, 0);
+          else
+            LongPacker.packLong(out, ((Long) o).longValue() + 1);
+        }
+      } else {
+        serializeCollection(ARRAYLIST, out, obj, objectStack);
+      }
+
+    } else if (clazz == java.util.LinkedList.class) {
+      serializeCollection(LINKEDLIST, out, obj, objectStack);
+    } else if (clazz == Vector.class) {
+      serializeCollection(VECTOR, out, obj, objectStack);
+    } else if (clazz == TreeSet.class) {
+      TreeSet l = (TreeSet) obj;
+      out.write(TREESET);
+      LongPacker.packInt(out, l.size());
+      serialize(out, l.comparator(), objectStack);
+      for (Object o : l)
+        serialize(out, o, objectStack);
+    } else if (clazz == HashSet.class) {
+      serializeCollection(HASHSET, out, obj, objectStack);
+    } else if (clazz == LinkedHashSet.class) {
+      serializeCollection(LINKEDHASHSET, out, obj, objectStack);
+    } else if (clazz == TreeMap.class) {
+      TreeMap l = (TreeMap) obj;
+      out.write(TREEMAP);
+      LongPacker.packInt(out, l.size());
+      serialize(out, l.comparator(), objectStack);
+      for (Object o : l.keySet()) {
+        serialize(out, o, objectStack);
+        serialize(out, l.get(o), objectStack);
+      }
+    } else if (clazz == HashMap.class) {
+      serializeMap(HASHMAP, out, obj, objectStack);
+    } else if (clazz == IdentityHashMap.class) {
+      serializeMap(IDENTITYHASHMAP, out, obj, objectStack);
+    } else if (clazz == LinkedHashMap.class) {
+      serializeMap(LINKEDHASHMAP, out, obj, objectStack);
+    } else if (clazz == Hashtable.class) {
+      serializeMap(HASHTABLE, out, obj, objectStack);
+    } else if (clazz == Properties.class) {
+      serializeMap(PROPERTIES, out, obj, objectStack);
+    } else if (clazz == Locale.class) {
+      out.write(LOCALE);
+      Locale l = (Locale) obj;
+      out.writeUTF(l.getLanguage());
+      out.writeUTF(l.getCountry());
+      out.writeUTF(l.getVariant());
+    } else {
+      out.write(NORMAL);
+      writeObject(out, obj, objectStack);
+    }
+
+  }
+
+  static void serializeString(DataOutput out, String obj) throws IOException {
+    final int len = obj.length();
+    LongPacker.packInt(out, len);
+    for (int i = 0; i < len; i++) {
+      int c = (int) obj.charAt(i); // TODO investigate if c could be negative
+                                   // here
+      LongPacker.packInt(out, c);
+    }
+
+  }
+
+  private void serializeUUID(DataOutput out, UUID uuid) throws IOException {
+    out.writeLong(uuid.getMostSignificantBits());
+    out.writeLong(uuid.getLeastSignificantBits());
+  }
+
+  private void serializeMap(int header, DataOutput out, Object obj,
+      FastArrayList objectStack) throws IOException {
+    Map l = (Map) obj;
+    out.write(header);
+    LongPacker.packInt(out, l.size());
+    for (Object o : l.keySet()) {
+      serialize(out, o, objectStack);
+      serialize(out, l.get(o), objectStack);
+    }
+  }
+
+  private void serializeCollection(int header, DataOutput out, Object obj,
+      FastArrayList objectStack) throws IOException {
+    Collection l = (Collection) obj;
+    out.write(header);
+    LongPacker.packInt(out, l.size());
+
+    for (Object o : l)
+      serialize(out, o, objectStack);
+
+  }
+
+  private void serializeByteArrayInt(DataOutput out, byte[] b)
+      throws IOException {
+    LongPacker.packInt(out, b.length);
+    out.write(b);
+  }
+
+  private void writeLongArray(DataOutput da, long[] obj) throws IOException {
+    long max = Long.MIN_VALUE;
+    long min = Long.MAX_VALUE;
+    for (long i : obj) {
+      max = Math.max(max, i);
+      min = Math.min(min, i);
+    }
+
+    if (0 <= min && max <= 255) {
+      da.write(ARRAY_LONG_B);
+      LongPacker.packInt(da, obj.length);
+      for (long l : obj)
+        da.write((int) l);
+    } else if (0 <= min && max <= Long.MAX_VALUE) {
+      da.write(ARRAY_LONG_PACKED);
+      LongPacker.packInt(da, obj.length);
+      for (long l : obj)
+        LongPacker.packLong(da, l);
+    } else if (Short.MIN_VALUE <= min && max <= Short.MAX_VALUE) {
+      da.write(ARRAY_LONG_S);
+      LongPacker.packInt(da, obj.length);
+      for (long l : obj)
+        da.writeShort((short) l);
+    } else if (Integer.MIN_VALUE <= min && max <= Integer.MAX_VALUE) {
+      da.write(ARRAY_LONG_I);
+      LongPacker.packInt(da, obj.length);
+      for (long l : obj)
+        da.writeInt((int) l);
+    } else {
+      da.write(ARRAY_LONG_L);
+      LongPacker.packInt(da, obj.length);
+      for (long l : obj)
+        da.writeLong(l);
+    }
+
+  }
+
+  private void writeIntArray(DataOutput da, int[] obj) throws IOException {
+    int max = Integer.MIN_VALUE;
+    int min = Integer.MAX_VALUE;
+    for (int i : obj) {
+      max = Math.max(max, i);
+      min = Math.min(min, i);
+    }
+
+    boolean fitsInByte = 0 <= min && max <= 255;
+    boolean fitsInShort = Short.MIN_VALUE >= min && max <= Short.MAX_VALUE;
+
+    if (obj.length <= 255 && fitsInByte) {
+      da.write(ARRAY_INT_B_255);
+      da.write(obj.length);
+      for (int i : obj)
+        da.write(i);
+    } else if (fitsInByte) {
+      da.write(ARRAY_INT_B_INT);
+      LongPacker.packInt(da, obj.length);
+      for (int i : obj)
+        da.write(i);
+    } else if (0 <= min && max <= Integer.MAX_VALUE) {
+      da.write(ARRAY_INT_PACKED);
+      LongPacker.packInt(da, obj.length);
+      for (int l : obj)
+        LongPacker.packInt(da, l);
+    } else if (fitsInShort) {
+      da.write(ARRAY_INT_S);
+      LongPacker.packInt(da, obj.length);
+      for (int i : obj)
+        da.writeShort(i);
+    } else {
+      da.write(ARRAY_INT_I);
+      LongPacker.packInt(da, obj.length);
+      for (int i : obj)
+        da.writeInt(i);
+    }
+
+  }
+
+  private void writeInteger(DataOutput da, final int val) throws IOException {
+    if (val == -1)
+      da.write(INTEGER_MINUS_1);
+    else if (val == 0)
+      da.write(INTEGER_0);
+    else if (val == 1)
+      da.write(INTEGER_1);
+    else if (val == 2)
+      da.write(INTEGER_2);
+    else if (val == 3)
+      da.write(INTEGER_3);
+    else if (val == 4)
+      da.write(INTEGER_4);
+    else if (val == 5)
+      da.write(INTEGER_5);
+    else if (val == 6)
+      da.write(INTEGER_6);
+    else if (val == 7)
+      da.write(INTEGER_7);
+    else if (val == 8)
+      da.write(INTEGER_8);
+    else if (val == Integer.MIN_VALUE)
+      da.write(INTEGER_MINUS_MAX);
+    else if (val > 0 && val < 255) {
+      da.write(INTEGER_255);
+      da.write(val);
+    } else if (val < 0) {
+      da.write(INTEGER_PACK_NEG);
+      LongPacker.packInt(da, -val);
+    } else {
+      da.write(INTEGER_PACK);
+      LongPacker.packInt(da, val);
+    }
+  }
+
+  private void writeLong(DataOutput da, final long val) throws IOException {
+    if (val == -1)
+      da.write(LONG_MINUS_1);
+    else if (val == 0)
+      da.write(LONG_0);
+    else if (val == 1)
+      da.write(LONG_1);
+    else if (val == 2)
+      da.write(LONG_2);
+    else if (val == 3)
+      da.write(LONG_3);
+    else if (val == 4)
+      da.write(LONG_4);
+    else if (val == 5)
+      da.write(LONG_5);
+    else if (val == 6)
+      da.write(LONG_6);
+    else if (val == 7)
+      da.write(LONG_7);
+    else if (val == 8)
+      da.write(LONG_8);
+    else if (val == Long.MIN_VALUE)
+      da.write(LONG_MINUS_MAX);
+    else if (val > 0 && val < 255) {
+      da.write(LONG_255);
+      da.write((int) val);
+    } else if (val < 0) {
+      da.write(LONG_PACK_NEG);
+      LongPacker.packLong(da, -val);
+    } else {
+      da.write(LONG_PACK);
+      LongPacker.packLong(da, val);
+    }
+  }
+
+  /**
+   * Deserialize an object from a byte array
+   * 
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public Object deserialize(byte[] buf) throws ClassNotFoundException,
+      IOException {
+    DataInputOutput bs = new DataInputOutput(buf);
+    Object ret = deserialize(bs);
+    if (bs.available() != 0)
+      throw new InternalError("bytes left: " + bs.available());
+
+    return ret;
+  }
+
+  static String deserializeString(DataInput buf) throws IOException {
+    int len = LongPacker.unpackInt(buf);
+    char[] b = new char[len];
+    for (int i = 0; i < len; i++)
+      b[i] = (char) LongPacker.unpackInt(buf);
+
+    return new String(b);
+  }
+
+  public Object deserialize(DataInput is) throws IOException,
+      ClassNotFoundException {
+    return deserialize(is, null);
+  }
+
+  public Object deserialize(DataInput is, FastArrayList objectStack)
+      throws IOException, ClassNotFoundException {
+
+    Object ret = null;
+
+    final int head = is.readUnsignedByte();
+
+    /** first try to deserialize object without allocating object stack */
+    switch (head) {
+      case NULL:
+        break;
+      case BOOLEAN_TRUE:
+        ret = Boolean.TRUE;
+        break;
+      case BOOLEAN_FALSE:
+        ret = Boolean.FALSE;
+        break;
+      case INTEGER_MINUS_1:
+        ret = Integer.valueOf(-1);
+        break;
+      case INTEGER_0:
+        ret = Integer.valueOf(0);
+        break;
+      case INTEGER_1:
+        ret = Integer.valueOf(1);
+        break;
+      case INTEGER_2:
+        ret = Integer.valueOf(2);
+        break;
+      case INTEGER_3:
+        ret = Integer.valueOf(3);
+        break;
+      case INTEGER_4:
+        ret = Integer.valueOf(4);
+        break;
+      case INTEGER_5:
+        ret = Integer.valueOf(5);
+        break;
+      case INTEGER_6:
+        ret = Integer.valueOf(6);
+        break;
+      case INTEGER_7:
+        ret = Integer.valueOf(7);
+        break;
+      case INTEGER_8:
+        ret = Integer.valueOf(8);
+        break;
+      case INTEGER_MINUS_MAX:
+        ret = Integer.valueOf(Integer.MIN_VALUE);
+        break;
+      case INTEGER_255:
+        ret = Integer.valueOf(is.readUnsignedByte());
+        break;
+      case INTEGER_PACK_NEG:
+        ret = Integer.valueOf(-LongPacker.unpackInt(is));
+        break;
+      case INTEGER_PACK:
+        ret = Integer.valueOf(LongPacker.unpackInt(is));
+        break;
+      case LONG_MINUS_1:
+        ret = Long.valueOf(-1);
+        break;
+      case LONG_0:
+        ret = Long.valueOf(0);
+        break;
+      case LONG_1:
+        ret = Long.valueOf(1);
+        break;
+      case LONG_2:
+        ret = Long.valueOf(2);
+        break;
+      case LONG_3:
+        ret = Long.valueOf(3);
+        break;
+      case LONG_4:
+        ret = Long.valueOf(4);
+        break;
+      case LONG_5:
+        ret = Long.valueOf(5);
+        break;
+      case LONG_6:
+        ret = Long.valueOf(6);
+        break;
+      case LONG_7:
+        ret = Long.valueOf(7);
+        break;
+      case LONG_8:
+        ret = Long.valueOf(8);
+        break;
+      case LONG_255:
+        ret = Long.valueOf(is.readUnsignedByte());
+        break;
+      case LONG_PACK_NEG:
+        ret = Long.valueOf(-LongPacker.unpackLong(is));
+        break;
+      case LONG_PACK:
+        ret = Long.valueOf(LongPacker.unpackLong(is));
+        break;
+      case LONG_MINUS_MAX:
+        ret = Long.valueOf(Long.MIN_VALUE);
+        break;
+      case SHORT_MINUS_1:
+        ret = Short.valueOf((short) -1);
+        break;
+      case SHORT_0:
+        ret = Short.valueOf((short) 0);
+        break;
+      case SHORT_1:
+        ret = Short.valueOf((short) 1);
+        break;
+      case SHORT_255:
+        ret = Short.valueOf((short) is.readUnsignedByte());
+        break;
+      case SHORT_FULL:
+        ret = Short.valueOf(is.readShort());
+        break;
+      case BYTE_MINUS_1:
+        ret = Byte.valueOf((byte) -1);
+        break;
+      case BYTE_0:
+        ret = Byte.valueOf((byte) 0);
+        break;
+      case BYTE_1:
+        ret = Byte.valueOf((byte) 1);
+        break;
+      case BYTE_FULL:
+        ret = Byte.valueOf(is.readByte());
+        break;
+      case SHORT_ARRAY:
+        int size = LongPacker.unpackInt(is);
+        ret = new short[size];
+        for (int i = 0; i < size; i++)
+          ((short[]) ret)[i] = is.readShort();
+        break;
+      case BOOLEAN_ARRAY:
+        size = LongPacker.unpackInt(is);
+        ret = new boolean[size];
+        for (int i = 0; i < size; i++)
+          ((boolean[]) ret)[i] = is.readBoolean();
+        break;
+      case DOUBLE_ARRAY:
+        size = LongPacker.unpackInt(is);
+        ret = new double[size];
+        for (int i = 0; i < size; i++)
+          ((double[]) ret)[i] = is.readDouble();
+        break;
+      case FLOAT_ARRAY:
+        size = LongPacker.unpackInt(is);
+        ret = new float[size];
+        for (int i = 0; i < size; i++)
+          ((float[]) ret)[i] = is.readFloat();
+        break;
+      case CHAR_ARRAY:
+        size = LongPacker.unpackInt(is);
+        ret = new char[size];
+        for (int i = 0; i < size; i++)
+          ((char[]) ret)[i] = is.readChar();
+        break;
+      case CHAR:
+        ret = Character.valueOf(is.readChar());
+        break;
+      case FLOAT_MINUS_1:
+        ret = Float.valueOf(-1);
+        break;
+      case FLOAT_0:
+        ret = Float.valueOf(0);
+        break;
+      case FLOAT_1:
+        ret = Float.valueOf(1);
+        break;
+      case FLOAT_255:
+        ret = Float.valueOf(is.readUnsignedByte());
+        break;
+      case FLOAT_SHORT:
+        ret = Float.valueOf(is.readShort());
+        break;
+      case FLOAT_FULL:
+        ret = Float.valueOf(is.readFloat());
+        break;
+      case DOUBLE_MINUS_1:
+        ret = Double.valueOf(-1);
+        break;
+      case DOUBLE_0:
+        ret = Double.valueOf(0);
+        break;
+      case DOUBLE_1:
+        ret = Double.valueOf(1);
+        break;
+      case DOUBLE_255:
+        ret = Double.valueOf(is.readUnsignedByte());
+        break;
+      case DOUBLE_SHORT:
+        ret = Double.valueOf(is.readShort());
+        break;
+      case DOUBLE_FULL:
+        ret = Double.valueOf(is.readDouble());
+        break;
+      case BIGINTEGER:
+        ret = new BigInteger(deserializeArrayByteInt(is));
+        break;
+      case BIGDECIMAL:
+        ret = new BigDecimal(new BigInteger(deserializeArrayByteInt(is)),
+            LongPacker.unpackInt(is));
+        break;
+      case STRING:
+        ret = deserializeString(is);
+        break;
+      case STRING_EMPTY:
+        ret = JDBMUtils.EMPTY_STRING;
+        break;
+
+      case CLASS:
+        ret = deserializeClass(is);
+        break;
+      case DATE:
+        ret = new Date(is.readLong());
+        break;
+      case UUID:
+        ret = deserializeUUID(is);
+        break;
+      case ARRAY_INT_B_255:
+        ret = deserializeArrayIntB255(is);
+        break;
+      case ARRAY_INT_B_INT:
+        ret = deserializeArrayIntBInt(is);
+        break;
+      case ARRAY_INT_S:
+        ret = deserializeArrayIntSInt(is);
+        break;
+      case ARRAY_INT_I:
+        ret = deserializeArrayIntIInt(is);
+        break;
+      case ARRAY_INT_PACKED:
+        ret = deserializeArrayIntPack(is);
+        break;
+      case ARRAY_LONG_B:
+        ret = deserializeArrayLongB(is);
+        break;
+      case ARRAY_LONG_S:
+        ret = deserializeArrayLongS(is);
+        break;
+      case ARRAY_LONG_I:
+        ret = deserializeArrayLongI(is);
+        break;
+      case ARRAY_LONG_L:
+        ret = deserializeArrayLongL(is);
+        break;
+      case ARRAY_LONG_PACKED:
+        ret = deserializeArrayLongPack(is);
+        break;
+      case ARRAYLIST_PACKED_LONG:
+        ret = deserializeArrayListPackedLong(is);
+        break;
+      case ARRAY_BYTE_INT:
+        ret = deserializeArrayByteInt(is);
+        break;
+      case LOCALE:
+        ret = new Locale(is.readUTF(), is.readUTF(), is.readUTF());
+        break;
+      case JDBMLINKEDLIST:
+        ret = LinkedList.deserialize(is, this);
+        break;
+      case HTREE:
+        ret = HTree.deserialize(is, this);
+        break;
+      case BTREE:
+        ret = BTree.readExternal(is, this);
+        break;
+      case BTREE_NODE_LEAF:
+        throw new InternalError("BPage header, wrong serializer used");
+      case BTREE_NODE_NONLEAF:
+        throw new InternalError("BPage header, wrong serializer used");
+      case JAVA_SERIALIZATION:
+        throw new InternalError(
+            "Wrong header, data were probably serialized with OutputStream, not with JDBM serialization");
+
+      case -1:
+        throw new EOFException();
+
+    }
+
+    if (ret != null || head == NULL) {
+      if (objectStack != null)
+        objectStack.add(ret);
+      return ret;
+    }
+
+    /** something else which needs object stack initialized */
+
+    if (objectStack == null)
+      objectStack = new FastArrayList();
+    int oldObjectStackSize = objectStack.size();
+
+    switch (head) {
+      case NORMAL:
+        ret = readObject(is, objectStack);
+        break;
+      case OBJECT_STACK:
+        ret = objectStack.get(LongPacker.unpackInt(is));
+        break;
+      case ARRAYLIST:
+        ret = deserializeArrayList(is, objectStack);
+        break;
+      case ARRAY_OBJECT:
+        ret = deserializeArrayObject(is, objectStack);
+        break;
+      case ARRAY_OBJECT_PACKED_LONG:
+        ret = deserializeArrayObjectPackedLong(is);
+        break;
+      case LINKEDLIST:
+        ret = deserializeLinkedList(is, objectStack);
+        break;
+      case TREESET:
+        ret = deserializeTreeSet(is, objectStack);
+        break;
+      case HASHSET:
+        ret = deserializeHashSet(is, objectStack);
+        break;
+      case LINKEDHASHSET:
+        ret = deserializeLinkedHashSet(is, objectStack);
+        break;
+      case VECTOR:
+        ret = deserializeVector(is, objectStack);
+        break;
+      case TREEMAP:
+        ret = deserializeTreeMap(is, objectStack);
+        break;
+      case HASHMAP:
+        ret = deserializeHashMap(is, objectStack);
+        break;
+      case IDENTITYHASHMAP:
+        ret = deserializeIdentityHashMap(is, objectStack);
+        break;
+      case LINKEDHASHMAP:
+        ret = deserializeLinkedHashMap(is, objectStack);
+        break;
+      case HASHTABLE:
+        ret = deserializeHashtable(is, objectStack);
+        break;
+      case PROPERTIES:
+        ret = deserializeProperties(is, objectStack);
+        break;
+
+      default:
+        throw new InternalError("Unknown serialization header: " + head);
+    }
+
+    if (head != OBJECT_STACK && objectStack.size() == oldObjectStackSize) {
+      // check if object was not already added to stack as part of collection
+      objectStack.add(ret);
+    }
+
+    return ret;
+  }
+
+  private Class deserializeClass(DataInput is) throws IOException,
+      ClassNotFoundException {
+    String className = (String) deserialize(is);
+    Class cls = Class.forName(className);
+    return cls;
+  }
+
+  private byte[] deserializeArrayByteInt(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    byte[] b = new byte[size];
+    is.readFully(b);
+    return b;
+  }
+
+  private long[] deserializeArrayLongL(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    long[] ret = new long[size];
+    for (int i = 0; i < size; i++)
+      ret[i] = is.readLong();
+    return ret;
+  }
+
+  private long[] deserializeArrayLongI(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    long[] ret = new long[size];
+    for (int i = 0; i < size; i++)
+      ret[i] = is.readInt();
+    return ret;
+  }
+
+  private long[] deserializeArrayLongS(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    long[] ret = new long[size];
+    for (int i = 0; i < size; i++)
+      ret[i] = is.readShort();
+    return ret;
+  }
+
+  private long[] deserializeArrayLongB(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    long[] ret = new long[size];
+    for (int i = 0; i < size; i++) {
+      ret[i] = is.readUnsignedByte();
+      if (ret[i] < 0)
+        throw new EOFException();
+    }
+    return ret;
+  }
+
+  private int[] deserializeArrayIntIInt(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    int[] ret = new int[size];
+    for (int i = 0; i < size; i++)
+      ret[i] = is.readInt();
+    return ret;
+  }
+
+  private int[] deserializeArrayIntSInt(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    int[] ret = new int[size];
+    for (int i = 0; i < size; i++)
+      ret[i] = is.readShort();
+    return ret;
+  }
+
+  private int[] deserializeArrayIntBInt(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    int[] ret = new int[size];
+    for (int i = 0; i < size; i++) {
+      ret[i] = is.readUnsignedByte();
+      if (ret[i] < 0)
+        throw new EOFException();
+    }
+    return ret;
+  }
+
+  private int[] deserializeArrayIntPack(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    if (size < 0)
+      throw new EOFException();
+
+    int[] ret = new int[size];
+    for (int i = 0; i < size; i++) {
+      ret[i] = LongPacker.unpackInt(is);
+    }
+    return ret;
+  }
+
+  private long[] deserializeArrayLongPack(DataInput is) throws IOException {
+    int size = LongPacker.unpackInt(is);
+    if (size < 0)
+      throw new EOFException();
+
+    long[] ret = new long[size];
+    for (int i = 0; i < size; i++) {
+      ret[i] = LongPacker.unpackLong(is);
+    }
+    return ret;
+  }
+
+  private UUID deserializeUUID(DataInput is) throws IOException {
+    return new UUID(is.readLong(), is.readLong());
+  }
+
+  private int[] deserializeArrayIntB255(DataInput is) throws IOException {
+    int size = is.readUnsignedByte();
+    if (size < 0)
+      throw new EOFException();
+
+    int[] ret = new int[size];
+    for (int i = 0; i < size; i++) {
+      ret[i] = is.readUnsignedByte();
+      if (ret[i] < 0)
+        throw new EOFException();
+    }
+    return ret;
+  }
+
+  private Object[] deserializeArrayObject(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    // Read class id for components
+    int classId = LongPacker.unpackInt(is);
+    Class clazz = classId2class.get(classId);
+    if (clazz == null)
+      clazz = Object.class;
+
+    Object[] s = (Object[]) Array.newInstance(clazz, size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s[i] = deserialize(is, objectStack);
+    return s;
+  }
+
+  private Object[] deserializeArrayObjectPackedLong(DataInput is)
+      throws IOException, ClassNotFoundException {
+    int size = is.readUnsignedByte();
+    Object[] s = new Object[size];
+    for (int i = 0; i < size; i++) {
+      long l = LongPacker.unpackLong(is);
+      if (l == 0)
+        s[i] = null;
+      else
+        s[i] = Long.valueOf(l - 1);
+    }
+    return s;
+  }
+
+  private ArrayList<Object> deserializeArrayList(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    ArrayList<Object> s = new ArrayList<Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++) {
+      s.add(deserialize(is, objectStack));
+    }
+    return s;
+  }
+
+  private ArrayList<Object> deserializeArrayListPackedLong(DataInput is)
+      throws IOException, ClassNotFoundException {
+    int size = is.readUnsignedByte();
+    if (size < 0)
+      throw new EOFException();
+
+    ArrayList<Object> s = new ArrayList<Object>(size);
+    for (int i = 0; i < size; i++) {
+      long l = LongPacker.unpackLong(is);
+      if (l == 0)
+        s.add(null);
+      else
+        s.add(Long.valueOf(l - 1));
+    }
+    return s;
+  }
+
+  private java.util.LinkedList deserializeLinkedList(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    java.util.LinkedList s = new java.util.LinkedList();
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.add(deserialize(is, objectStack));
+    return s;
+  }
+
+  private Vector<Object> deserializeVector(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    Vector<Object> s = new Vector<Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.add(deserialize(is, objectStack));
+    return s;
+  }
+
+  private HashSet<Object> deserializeHashSet(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    HashSet<Object> s = new HashSet<Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.add(deserialize(is, objectStack));
+    return s;
+  }
+
+  private LinkedHashSet<Object> deserializeLinkedHashSet(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    LinkedHashSet<Object> s = new LinkedHashSet<Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.add(deserialize(is, objectStack));
+    return s;
+  }
+
+  private TreeSet<Object> deserializeTreeSet(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+    TreeSet<Object> s = new TreeSet<Object>();
+    objectStack.add(s);
+    Comparator comparator = (Comparator) deserialize(is, objectStack);
+    if (comparator != null)
+      s = new TreeSet<Object>(comparator);
+
+    for (int i = 0; i < size; i++)
+      s.add(deserialize(is, objectStack));
+    return s;
+  }
+
+  private TreeMap<Object, Object> deserializeTreeMap(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    TreeMap<Object, Object> s = new TreeMap<Object, Object>();
+    objectStack.add(s);
+    Comparator comparator = (Comparator) deserialize(is, objectStack);
+    if (comparator != null)
+      s = new TreeMap<Object, Object>(comparator);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  private HashMap<Object, Object> deserializeHashMap(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    HashMap<Object, Object> s = new HashMap<Object, Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  private IdentityHashMap<Object, Object> deserializeIdentityHashMap(
+      DataInput is, FastArrayList objectStack) throws IOException,
+      ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    IdentityHashMap<Object, Object> s = new IdentityHashMap<Object, Object>(
+        size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  private LinkedHashMap<Object, Object> deserializeLinkedHashMap(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    LinkedHashMap<Object, Object> s = new LinkedHashMap<Object, Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  private Hashtable<Object, Object> deserializeHashtable(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    Hashtable<Object, Object> s = new Hashtable<Object, Object>(size);
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  private Properties deserializeProperties(DataInput is,
+      FastArrayList objectStack) throws IOException, ClassNotFoundException {
+    int size = LongPacker.unpackInt(is);
+
+    Properties s = new Properties();
+    objectStack.add(s);
+    for (int i = 0; i < size; i++)
+      s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+    return s;
+  }
+
+  /**
+   * Utility class similar to ArrayList, but with fast identity search.
+   */
+  static class FastArrayList<K> {
+
+    private int size = 0;
+    private K[] elementData = (K[]) new Object[8];
+
+    K get(int index) {
+      if (index >= size)
+        throw new IndexOutOfBoundsException();
+      return elementData[index];
+    }
+
+    void add(K o) {
+      if (elementData.length == size) {
+        // grow array if necessary
+        elementData = Arrays.copyOf(elementData, elementData.length * 2);
+      }
+
+      elementData[size] = o;
+      size++;
+    }
+
+    int size() {
+      return size;
+    }
+
+    /**
+     * This method is reason why ArrayList is not used. Search an item in list
+     * and returns its index. It uses identity rather than 'equalsTo' One could
+     * argue that TreeMap should be used instead, but we do not expect large
+     * object trees. This search is VERY FAST compared to Maps, it does not
+     * allocate new instances or uses method calls.
+     * 
+     * @param obj
+     * @return index of object in list or -1 if not found
+     */
+    int identityIndexOf(Object obj) {
+      for (int i = 0; i < size; i++) {
+        if (obj == elementData[i])
+          return i;
+      }
+      return -1;
+    }
+
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+/**
+ * Header byte, is used at start of each record to indicate data type WARNING
+ * !!! values below must be unique !!!!!
+ */
+public interface SerializationHeader {
+
+  final static int NULL = 0;
+  final static int NORMAL = 1;
+  final static int BOOLEAN_TRUE = 2;
+  final static int BOOLEAN_FALSE = 3;
+  final static int INTEGER_MINUS_1 = 4;
+  final static int INTEGER_0 = 5;
+  final static int INTEGER_1 = 6;
+  final static int INTEGER_2 = 7;
+  final static int INTEGER_3 = 8;
+  final static int INTEGER_4 = 9;
+  final static int INTEGER_5 = 10;
+  final static int INTEGER_6 = 11;
+  final static int INTEGER_7 = 12;
+  final static int INTEGER_8 = 13;
+  final static int INTEGER_255 = 14;
+  final static int INTEGER_PACK_NEG = 15;
+  final static int INTEGER_PACK = 16;
+  final static int LONG_MINUS_1 = 17;
+  final static int LONG_0 = 18;
+  final static int LONG_1 = 19;
+  final static int LONG_2 = 20;
+  final static int LONG_3 = 21;
+  final static int LONG_4 = 22;
+  final static int LONG_5 = 23;
+  final static int LONG_6 = 24;
+  final static int LONG_7 = 25;
+  final static int LONG_8 = 26;
+  final static int LONG_PACK_NEG = 27;
+  final static int LONG_PACK = 28;
+  final static int LONG_255 = 29;
+  final static int LONG_MINUS_MAX = 30;
+  final static int SHORT_MINUS_1 = 31;
+  final static int SHORT_0 = 32;
+  final static int SHORT_1 = 33;
+  final static int SHORT_255 = 34;
+  final static int SHORT_FULL = 35;
+  final static int BYTE_MINUS_1 = 36;
+  final static int BYTE_0 = 37;
+  final static int BYTE_1 = 38;
+  final static int BYTE_FULL = 39;
+  final static int CHAR = 40;
+  final static int FLOAT_MINUS_1 = 41;
+  final static int FLOAT_0 = 42;
+  final static int FLOAT_1 = 43;
+  final static int FLOAT_255 = 44;
+  final static int FLOAT_SHORT = 45;
+  final static int FLOAT_FULL = 46;
+  final static int DOUBLE_MINUS_1 = 47;
+  final static int DOUBLE_0 = 48;
+  final static int DOUBLE_1 = 49;
+  final static int DOUBLE_255 = 50;
+  final static int DOUBLE_SHORT = 51;
+  final static int DOUBLE_FULL = 52;
+  final static int DOUBLE_ARRAY = 53;
+  final static int BIGDECIMAL = 54;
+  final static int BIGINTEGER = 55;
+  final static int FLOAT_ARRAY = 56;
+  final static int INTEGER_MINUS_MAX = 57;
+  final static int SHORT_ARRAY = 58;
+  final static int BOOLEAN_ARRAY = 59;
+
+  final static int ARRAY_INT_B_255 = 60;
+  final static int ARRAY_INT_B_INT = 61;
+  final static int ARRAY_INT_S = 62;
+  final static int ARRAY_INT_I = 63;
+  final static int ARRAY_INT_PACKED = 64;
+
+  final static int ARRAY_LONG_B = 65;
+  final static int ARRAY_LONG_S = 66;
+  final static int ARRAY_LONG_I = 67;
+  final static int ARRAY_LONG_L = 68;
+  final static int ARRAY_LONG_PACKED = 69;
+
+  final static int CHAR_ARRAY = 70;
+  final static int ARRAY_BYTE_INT = 71;
+
+  final static int NOTUSED_ARRAY_OBJECT_255 = 72;
+  final static int ARRAY_OBJECT = 73;
+  // special cases for BTree values which stores references
+  final static int ARRAY_OBJECT_PACKED_LONG = 74;
+  final static int ARRAYLIST_PACKED_LONG = 75;
+
+  final static int STRING_EMPTY = 101;
+  final static int NOTUSED_STRING_255 = 102;
+  final static int STRING = 103;
+  final static int NOTUSED_ARRAYLIST_255 = 104;
+  final static int ARRAYLIST = 105;
+
+  final static int NOTUSED_TREEMAP_255 = 106;
+  final static int TREEMAP = 107;
+  final static int NOTUSED_HASHMAP_255 = 108;
+  final static int HASHMAP = 109;
+  final static int NOTUSED_LINKEDHASHMAP_255 = 110;
+  final static int LINKEDHASHMAP = 111;
+
+  final static int NOTUSED_TREESET_255 = 112;
+  final static int TREESET = 113;
+  final static int NOTUSED_HASHSET_255 = 114;
+  final static int HASHSET = 115;
+  final static int NOTUSED_LINKEDHASHSET_255 = 116;
+  final static int LINKEDHASHSET = 117;
+  final static int NOTUSED_LINKEDLIST_255 = 118;
+  final static int LINKEDLIST = 119;
+
+  final static int NOTUSED_VECTOR_255 = 120;
+  final static int VECTOR = 121;
+  final static int IDENTITYHASHMAP = 122;
+  final static int HASHTABLE = 123;
+  final static int LOCALE = 124;
+  final static int PROPERTIES = 125;
+
+  final static int CLASS = 126;
+  final static int DATE = 127;
+  final static int UUID = 128;
+
+  static final int JDBMLINKEDLIST = 159;
+  static final int HTREE = 160;
+
+  final static int BTREE = 161;
+
+  static final int BTREE_NODE_LEAF = 162;
+  static final int BTREE_NODE_NONLEAF = 163;
+  static final int HTREE_BUCKET = 164;
+  static final int HTREE_DIRECTORY = 165;
+  /**
+   * used for reference to already serialized object in object graph
+   */
+  static final int OBJECT_STACK = 166;
+  static final int JAVA_SERIALIZATION = 172;
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Interface used to provide a serialization mechanism other than a class'
+ * normal serialization.
+ */
+public interface Serializer<A> {
+
+  /**
+   * Serialize the content of an object into a byte array.
+   * 
+   * @param out ObjectOutput to save object into
+   * @param obj Object to serialize
+   */
+  public void serialize(DataOutput out, A obj) throws IOException;
+
+  /**
+   * Deserialize the content of an object from a byte array.
+   * 
+   * @param in to read serialized data from
+   * @return deserialized object
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  public A deserialize(DataInput in) throws IOException, ClassNotFoundException;
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface Storage {
+
+  /**
+   * Bite shift used to calculate page size. If you want to modify page size, do
+   * it here.
+   * 
+   * 1<<9 = 512 1<<10 = 1024 1<<11 = 2048 1<<12 = 4096
+   */
+  int PAGE_SIZE_SHIFT = 12;
+
+  /**
+   * the lenght of single page.
+   * <p>
+   * !!! DO NOT MODIFY THI DIRECTLY !!!
+   */
+  int PAGE_SIZE = 1 << PAGE_SIZE_SHIFT;
+
+  /**
+   * use 'val & OFFSET_MASK' to quickly get offset within the page;
+   */
+  long OFFSET_MASK = 0xFFFFFFFFFFFFFFFFL >>> (64 - Storage.PAGE_SIZE_SHIFT);
+
+  void write(long pageNumber, ByteBuffer data) throws IOException;
+
+  ByteBuffer read(long pageNumber) throws IOException;
+
+  void forceClose() throws IOException;
+
+  boolean isReadonly();
+
+  DataInputStream readTransactionLog();
+
+  void deleteTransactionLog();
+
+  void sync() throws IOException;
+
+  DataOutputStream openTransactionLog() throws IOException;
+
+  void deleteAllFiles() throws IOException;
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Storage which used files on disk to store data
+ */
+public final class StorageDisk implements Storage {
+
+  private ArrayList<RandomAccessFile> rafs = new ArrayList<RandomAccessFile>();
+  private ArrayList<RandomAccessFile> rafsTranslation = new ArrayList<RandomAccessFile>();
+
+  private String fileName;
+
+  private boolean readonly;
+
+  public StorageDisk(String fileName, boolean readonly, boolean lockingDisabled)
+      throws IOException {
+    this.fileName = fileName;
+    this.readonly = readonly;
+    // make sure first file can be opened
+    // lock it
+    try {
+      if (!readonly && !lockingDisabled)
+        getRaf(0).getChannel().tryLock();
+    } catch (IOException e) {
+      throw new IOException("Could not lock DB file: " + fileName, e);
+    } catch (OverlappingFileLockException e) {
+      throw new IOException("Could not lock DB file: " + fileName, e);
+    }
+
+  }
+
+  RandomAccessFile getRaf(long pageNumber) throws IOException {
+
+    int fileNumber = (int) (Math.abs(pageNumber) / StorageDiskMapped.PAGES_PER_FILE);
+
+    List<RandomAccessFile> c = pageNumber >= 0 ? rafs : rafsTranslation;
+
+    // increase capacity of array lists if needed
+    for (int i = c.size(); i <= fileNumber; i++) {
+      c.add(null);
+    }
+
+    RandomAccessFile ret = c.get(fileNumber);
+    if (ret == null) {
+      String name = StorageDiskMapped.makeFileName(fileName, pageNumber,
+          fileNumber);
+      ret = new RandomAccessFile(name, readonly ? "r" : "rw");
+      c.set(fileNumber, ret);
+    }
+    return ret;
+
+  }
+
+  @Override
+  public void write(long pageNumber, ByteBuffer data) throws IOException {
+    if (data.capacity() != PAGE_SIZE)
+      throw new IllegalArgumentException();
+
+    long offset = pageNumber * PAGE_SIZE;
+
+    RandomAccessFile file = getRaf(pageNumber);
+
+    file.seek(Math.abs(offset % (StorageDiskMapped.PAGES_PER_FILE * PAGE_SIZE)));
+
+    file.write(data.array());
+  }
+
+  @Override
+  public ByteBuffer read(long pageNumber) throws IOException {
+
+    long offset = pageNumber * PAGE_SIZE;
+    ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
+
+    RandomAccessFile file = getRaf(pageNumber);
+    file.seek(Math.abs(offset % (StorageDiskMapped.PAGES_PER_FILE * PAGE_SIZE)));
+    int remaining = buffer.limit();
+    int pos = 0;
+    while (remaining > 0) {
+      int read = file.read(buffer.array(), pos, remaining);
+      if (read == -1) {
+        System
+            .arraycopy(PageFile.CLEAN_DATA, 0, buffer.array(), pos, remaining);
+        break;
+      }
+      remaining -= read;
+      pos += read;
+    }
+    return buffer;
+  }
+
+  static final String transaction_log_file_extension = ".t";
+
+  @Override
+  public DataOutputStream openTransactionLog() throws IOException {
+    String logName = fileName + transaction_log_file_extension;
+    final FileOutputStream fileOut = new FileOutputStream(logName);
+    return new DataOutputStream(new BufferedOutputStream(fileOut)) {
+
+      // default implementation of flush on FileOutputStream does nothing,
+      // so we use little workaround to make sure that data were really flushed
+      @Override
+      public void flush() throws IOException {
+        super.flush();
+        fileOut.flush();
+        fileOut.getFD().sync();
+      }
+    };
+  }
+
+  @Override
+  public void deleteAllFiles() {
+    deleteTransactionLog();
+    StorageDiskMapped.deleteFiles(fileName);
+  }
+
+  /**
+   * Synchronizes the file.
+   */
+  @Override
+  public void sync() throws IOException {
+    for (RandomAccessFile file : rafs)
+      if (file != null)
+        file.getFD().sync();
+    for (RandomAccessFile file : rafsTranslation)
+      if (file != null)
+        file.getFD().sync();
+  }
+
+  @Override
+  public void forceClose() throws IOException {
+    for (RandomAccessFile f : rafs) {
+      if (f != null)
+        f.close();
+    }
+    rafs = null;
+    for (RandomAccessFile f : rafsTranslation) {
+      if (f != null)
+        f.close();
+    }
+    rafsTranslation = null;
+  }
+
+  @Override
+  public DataInputStream readTransactionLog() {
+
+    File logFile = new File(fileName + transaction_log_file_extension);
+    if (!logFile.exists())
+      return null;
+    if (logFile.length() == 0) {
+      logFile.delete();
+      return null;
+    }
+
+    DataInputStream ois = null;
+    try {
+      ois = new DataInputStream(new BufferedInputStream(new FileInputStream(
+          logFile)));
+    } catch (FileNotFoundException e) {
+      // file should exists, we check for its presents just a miliseconds
+      // yearlier, anyway move on
+      return null;
+    }
+
+    try {
+      if (ois.readShort() != Magic.LOGFILE_HEADER)
+        throw new Error("Bad magic on log file");
+    } catch (IOException e) {
+      // corrupted/empty logfile
+      logFile.delete();
+      return null;
+    }
+    return ois;
+  }
+
+  @Override
+  public void deleteTransactionLog() {
+    File logFile = new File(fileName + transaction_log_file_extension);
+    if (logFile.exists())
+      logFile.delete();
+  }
+
+  @Override
+  public boolean isReadonly() {
+    return false;
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Disk storage which uses mapped buffers
+ */
+public final class StorageDiskMapped implements Storage {
+
+  static final String IDR = ".i";
+
+  static final String DBR = ".d";
+
+  /**
+   * Maximal number of pages in single file. Calculated so that each file will
+   * have 1 GB
+   */
+  final static long PAGES_PER_FILE = (1024 * 1024 * 1024) >>> Storage.PAGE_SIZE_SHIFT;
+
+  private ArrayList<FileChannel> channels = new ArrayList<FileChannel>();
+  private ArrayList<FileChannel> channelsTranslation = new ArrayList<FileChannel>();
+  private IdentityHashMap<FileChannel, MappedByteBuffer> buffers = new IdentityHashMap<FileChannel, MappedByteBuffer>();
+
+  private String fileName;
+  private boolean transactionsDisabled;
+  private boolean readonly;
+
+  public StorageDiskMapped(String fileName, boolean readonly,
+      boolean transactionsDisabled, boolean lockingDisabled) throws IOException {
+    this.fileName = fileName;
+    this.transactionsDisabled = transactionsDisabled;
+    this.readonly = readonly;
+    // make sure first file can be opened
+    // lock it
+    try {
+      if (!lockingDisabled)
+        getChannel(0).lock();
+    } catch (IOException e) {
+      throw new IOException("Could not lock DB file: " + fileName, e);
+    } catch (OverlappingFileLockException e) {
+      throw new IOException("Could not lock DB file: " + fileName, e);
+    }
+
+  }
+
+  private FileChannel getChannel(long pageNumber) throws IOException {
+    int fileNumber = (int) (Math.abs(pageNumber) / PAGES_PER_FILE);
+
+    List<FileChannel> c = pageNumber >= 0 ? channels : channelsTranslation;
+
+    // increase capacity of array lists if needed
+    for (int i = c.size(); i <= fileNumber; i++) {
+      c.add(null);
+    }
+
+    FileChannel ret = c.get(fileNumber);
+    if (ret == null) {
+      String name = makeFileName(fileName, pageNumber, fileNumber);
+      ret = new RandomAccessFile(name, "rw").getChannel();
+      c.set(fileNumber, ret);
+      buffers.put(ret, ret.map(FileChannel.MapMode.READ_WRITE, 0, ret.size()));
+    }
+    return ret;
+  }
+
+  static String makeFileName(String fileName, long pageNumber, int fileNumber) {
+    return fileName + (pageNumber >= 0 ? DBR : IDR) + "." + fileNumber;
+  }
+
+  @Override
+  public void write(long pageNumber, ByteBuffer data) throws IOException {
+    if (transactionsDisabled && data.isDirect()) {
+      // if transactions are disabled and this buffer is direct,
+      // changes written into buffer are directly reflected in file.
+      // so there is no need to write buffer second time
+      return;
+    }
+
+    FileChannel f = getChannel(pageNumber);
+    int offsetInFile = (int) ((Math.abs(pageNumber) % PAGES_PER_FILE) * PAGE_SIZE);
+    MappedByteBuffer b = buffers.get(f);
+    if (b.limit() <= offsetInFile) {
+
+      // remapping buffer for each newly added page would be slow,
+      // so allocate new size in chunks
+      int increment = Math.min(PAGE_SIZE * 1024, offsetInFile / 10);
+      increment -= increment % PAGE_SIZE;
+
+      long newFileSize = offsetInFile + PAGE_SIZE + increment;
+      newFileSize = Math.min(PAGES_PER_FILE * PAGE_SIZE, newFileSize);
+
+      // expand file size
+      f.position(newFileSize - 1);
+      f.write(ByteBuffer.allocate(1));
+      // unmap old buffer
+      unmapBuffer(b);
+      // remap buffer
+      b = f.map(FileChannel.MapMode.READ_WRITE, 0, newFileSize);
+      buffers.put(f, b);
+    }
+
+    // write into buffer
+    b.position(offsetInFile);
+    data.rewind();
+    b.put(data);
+  }
+
+  private static void unmapBuffer(MappedByteBuffer b) {
+    // trying to GC, because cleaner is no public and stable API
+    System.gc();
+    // if (b != null) {
+    // Cleaner cleaner = ((sun.nio.ch.DirectBuffer) b).cleaner();
+    // if (cleaner != null)
+    // cleaner.clean();
+    // }
+  }
+
+  @Override
+  public ByteBuffer read(long pageNumber) throws IOException {
+    FileChannel f = getChannel(pageNumber);
+    int offsetInFile = (int) ((Math.abs(pageNumber) % PAGES_PER_FILE) * PAGE_SIZE);
+    MappedByteBuffer b = buffers.get(f);
+
+    if (b == null) { // not mapped yet
+      b = f.map(FileChannel.MapMode.READ_WRITE, 0, f.size());
+    }
+
+    // check buffers size
+    if (b.limit() <= offsetInFile) {
+      // file is smaller, return empty data
+      return ByteBuffer.wrap(PageFile.CLEAN_DATA).asReadOnlyBuffer();
+    }
+
+    b.position(offsetInFile);
+    ByteBuffer ret = b.slice();
+    ret.limit(PAGE_SIZE);
+    if (!transactionsDisabled || readonly) {
+      // changes written into buffer will be directly written into file
+      // so we need to protect buffer from modifications
+      ret = ret.asReadOnlyBuffer();
+    }
+    return ret;
+  }
+
+  @Override
+  public void forceClose() throws IOException {
+    for (FileChannel f : channels) {
+      if (f == null)
+        continue;
+      f.close();
+      unmapBuffer(buffers.get(f));
+    }
+    for (FileChannel f : channelsTranslation) {
+      if (f == null)
+        continue;
+      f.close();
+      unmapBuffer(buffers.get(f));
+    }
+
+    channels = null;
+    channelsTranslation = null;
+    buffers = null;
+  }
+
+  @Override
+  public void sync() throws IOException {
+    for (MappedByteBuffer b : buffers.values()) {
+      b.force();
+    }
+  }
+
+  @Override
+  public DataOutputStream openTransactionLog() throws IOException {
+    String logName = fileName + StorageDisk.transaction_log_file_extension;
+    final FileOutputStream fileOut = new FileOutputStream(logName);
+    return new DataOutputStream(new BufferedOutputStream(fileOut)) {
+
+      // default implementation of flush on FileOutputStream does nothing,
+      // so we use little workaround to make sure that data were really flushed
+      @Override
+      public void flush() throws IOException {
+        super.flush();
+        fileOut.flush();
+        fileOut.getFD().sync();
+      }
+    };
+  }
+
+  @Override
+  public void deleteAllFiles() throws IOException {
+    deleteTransactionLog();
+    deleteFiles(fileName);
+  }
+
+  static void deleteFiles(String fileName) {
+    for (int i = 0; true; i++) {
+      String name = makeFileName(fileName, +1, i);
+      File f = new File(name);
+      boolean exists = f.exists();
+      if (exists && !f.delete())
+        f.deleteOnExit();
+      if (!exists)
+        break;
+    }
+    for (int i = 0; true; i++) {
+      String name = makeFileName(fileName, -1, i);
+      File f = new File(name);
+      boolean exists = f.exists();
+      if (exists && !f.delete())
+        f.deleteOnExit();
+      if (!exists)
+        break;
+    }
+  }
+
+  @Override
+  public DataInputStream readTransactionLog() {
+
+    File logFile = new File(fileName
+        + StorageDisk.transaction_log_file_extension);
+    if (!logFile.exists())
+      return null;
+    if (logFile.length() == 0) {
+      logFile.delete();
+      return null;
+    }
+
+    DataInputStream ois = null;
+    try {
+      ois = new DataInputStream(new BufferedInputStream(new FileInputStream(
+          logFile)));
+    } catch (FileNotFoundException e) {
+      // file should exists, we check for its presents just a miliseconds
+      // yearlier, anyway move on
+      return null;
+    }
+
+    try {
+      if (ois.readShort() != Magic.LOGFILE_HEADER)
+        throw new Error("Bad magic on log file");
+    } catch (IOException e) {
+      // corrupted/empty logfile
+      logFile.delete();
+      return null;
+    }
+    return ois;
+  }
+
+  @Override
+  public void deleteTransactionLog() {
+    File logFile = new File(fileName
+        + StorageDisk.transaction_log_file_extension);
+    if (logFile.exists())
+      logFile.delete();
+  }
+
+  @Override
+  public boolean isReadonly() {
+    return readonly;
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Storage which keeps all data in memory. Data are lost after storage is
+ * closed.
+ */
+public final class StorageMemory implements Storage {
+
+  private LongHashMap<byte[]> pages = new LongHashMap<byte[]>();
+  private boolean transactionsDisabled;
+
+  StorageMemory(boolean transactionsDisabled) {
+    this.transactionsDisabled = transactionsDisabled;
+  }
+
+  @Override
+  public ByteBuffer read(long pageNumber) throws IOException {
+
+    byte[] data = pages.get(pageNumber);
+    if (data == null) {
+      // out of bounds, so just return empty data
+      return ByteBuffer.wrap(PageFile.CLEAN_DATA).asReadOnlyBuffer();
+    } else {
+      ByteBuffer b = ByteBuffer.wrap(data);
+      if (!transactionsDisabled)
+        return b.asReadOnlyBuffer();
+      else
+        return b;
+    }
+
+  }
+
+  @Override
+  public void write(long pageNumber, ByteBuffer data) throws IOException {
+    if (data.capacity() != PAGE_SIZE)
+      throw new IllegalArgumentException();
+
+    byte[] b = pages.get(pageNumber);
+
+    if (transactionsDisabled && data.hasArray() && data.array() == b) {
+      // already putted directly into array
+      return;
+    }
+
+    if (b == null)
+      b = new byte[PAGE_SIZE];
+
+    data.position(0);
+    data.get(b, 0, PAGE_SIZE);
+    pages.put(pageNumber, b);
+  }
+
+  @Override
+  public void sync() throws IOException {
+  }
+
+  @Override
+  public void forceClose() throws IOException {
+    pages = null;
+  }
+
+  private ByteArrayOutputStream transLog;
+
+  @Override
+  public DataInputStream readTransactionLog() {
+    if (transLog == null)
+      return null;
+    DataInputStream ret = new DataInputStream(new ByteArrayInputStream(
+        transLog.toByteArray()));
+    // read stream header
+    try {
+      ret.readShort();
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+    return ret;
+  }
+
+  @Override
+  public void deleteTransactionLog() {
+    transLog = null;
+  }
+
+  @Override
+  public DataOutputStream openTransactionLog() throws IOException {
+    if (transLog == null)
+      transLog = new ByteArrayOutputStream();
+    return new DataOutputStream(transLog);
+  }
+
+  @Override
+  public void deleteAllFiles() throws IOException {
+  }
+
+  @Override
+  public boolean isReadonly() {
+    return false;
+  }
+}

Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+/**
+ * Random insertion/removal test for B+Tree data structure.
+ */
+public class BTreeBench extends TestCaseWithTestFile {
+
+  DBAbstract db;
+
+  /**
+   * Test w/o compression or specialized key or value serializers.
+   * 
+   * @throws IOException
+   */
+  public void test_001() throws IOException {
+    db = newDBCache();
+    BTree<Long, Long> tree = BTree.createInstance(db);
+    doTest(db, tree, 5001);
+    db.close();
+  }
+
+  public static void doTest(DB db, BTree<Long, Long> tree, int ITERATIONS)
+      throws IOException {
+
+    long beginTime = System.currentTimeMillis();
+    Hashtable<Long, Long> hash = new Hashtable<Long, Long>();
+
+    for (int i = 0; i < ITERATIONS; i++) {
+      Long random = new Long(random(0, 64000));
+
+      if ((i % 5000) == 0) {
+        long elapsed = System.currentTimeMillis() - beginTime;
+        System.out.println("Iterations=" + i + " Objects=" + tree._entries
+            + ", elapsed=" + elapsed + "ms");
+        db.commit();
+      }
+      if (hash.get(random) == null) {
+        // System.out.println( "Insert " + random );
+        hash.put(random, random);
+        tree.insert(random, random, false);
+      } else {
+        // System.out.println( "Remove " + random );
+        hash.remove(random);
+        Object removed = tree.remove(random);
+        if ((removed == null) || (!removed.equals(random))) {
+          throw new IllegalStateException("Remove expected " + random + " got "
+              + removed);
+        }
+      }
+      // tree.assertOrdering();
+      compare(tree, hash);
+    }
+
+  }
+
+  static long random(int min, int max) {
+    return Math.round(Math.random() * (max - min)) + min;
+  }
+
+  static void compare(BTree<Long, Long> tree, Hashtable<Long, Long> hash)
+      throws IOException {
+    boolean failed = false;
+    Enumeration<Long> enumeration;
+
+    if (tree._entries != hash.size()) {
+      throw new IllegalStateException("Tree size " + tree._entries
+          + " Hash size " + hash.size());
+    }
+
+    enumeration = hash.keys();
+    while (enumeration.hasMoreElements()) {
+      Long key = enumeration.nextElement();
+      Long hashValue = hash.get(key);
+      Long treeValue = tree.get(key);
+      if (!hashValue.equals(treeValue)) {
+        System.out.println("Compare expected " + hashValue + " got "
+            + treeValue);
+        failed = true;
+      }
+    }
+    if (failed) {
+      throw new IllegalStateException("Compare failed");
+    }
+  }
+
+}

Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.SortedMap;
+
+public class BTreeKeyCompressionTest extends TestCaseWithTestFile {
+
+  static final long size = (long) 1e5;
+
+  public void testExpand() throws IOException {
+    long init = Long.MAX_VALUE - size * 2;
+    String file = newTestFile();
+    DB db = new DBStore(file, false, false, false);
+    SortedMap<Long, String> map = db.createTreeMap("aa");
+    for (long i = init; i < init + size; i++) {
+      map.put(i, "");
+    }
+    db.commit();
+    db.defrag(true);
+    db.close();
+    long fileSize = new File(file + ".dbr.0").length() / 1024;
+    System.out.println("file size: " + fileSize);
+    assertTrue("file is too big, compression failed", fileSize < 1000);
+  }
+
+  public void testCornersLimitsLong() throws IOException {
+    DB db = newDBCache();
+    SortedMap<Long, String> map = db.createTreeMap("aa");
+    ArrayList<Long> ll = new ArrayList<Long>();
+    for (Long i = Long.MIN_VALUE; i < Long.MIN_VALUE + 1000; i++) {
+      map.put(i, "");
+      ll.add(i);
+    }
+    for (Long i = -1000l; i < 1000; i++) {
+      map.put(i, "");
+      ll.add(i);
+    }
+    for (Long i = Long.MAX_VALUE - 1000; i <= Long.MAX_VALUE && i > 0; i++) {
+      map.put(i, "");
+      ll.add(i);
+    }
+
+    db.commit();
+
+    db.clearCache();
+    for (Long i : ll) {
+      assertTrue("failed for " + i, map.containsKey(i));
+    }
+
+    assertTrue(!map.containsKey(Long.valueOf(Long.MIN_VALUE + 1000)));
+    assertTrue(!map.containsKey(Long.valueOf(Long.MIN_VALUE + 1001)));
+    assertTrue(!map.containsKey(Long.valueOf(-1001L)));
+    assertTrue(!map.containsKey(Long.valueOf(-1002L)));
+    assertTrue(!map.containsKey(Long.valueOf(1001L)));
+    assertTrue(!map.containsKey(Long.valueOf(1002L)));
+    assertTrue(!map.containsKey(Long.valueOf(Long.MAX_VALUE - 1001)));
+    assertTrue(!map.containsKey(Long.valueOf(Long.MAX_VALUE - 1002)));
+
+    db.close();
+  }
+
+  public void testCornersLimitsInt() throws IOException {
+    DB db = newDBCache();
+    SortedMap<Integer, String> map = db.createTreeMap("aa");
+    ArrayList<Integer> ll = new ArrayList<Integer>();
+    for (Integer i = Integer.MIN_VALUE; i < Integer.MIN_VALUE + 1000; i++) {
+      map.put(new Integer(i), "");
+      ll.add(new Integer(i));
+    }
+    for (Integer i = -1000; i < 1000; i++) {
+      map.put(i, "");
+      ll.add(i);
+    }
+    for (Integer i = Integer.MAX_VALUE - 1000; i <= Integer.MAX_VALUE && i > 0; i++) {
+      map.put(i, "");
+      ll.add(i);
+    }
+
+    db.commit();
+
+    db.clearCache();
+    for (Integer i : ll) {
+      assertTrue("failed for " + i, map.containsKey(i));
+    }
+
+    assertTrue(!map.containsKey(Integer.valueOf(Integer.MIN_VALUE + 1000)));
+    assertTrue(!map.containsKey(Integer.valueOf(Integer.MIN_VALUE + 1001)));
+    assertTrue(!map.containsKey(Integer.valueOf(-1001)));
+    assertTrue(!map.containsKey(Integer.valueOf(-1002)));
+    assertTrue(!map.containsKey(Integer.valueOf(1001)));
+    assertTrue(!map.containsKey(Integer.valueOf(1002)));
+    assertTrue(!map.containsKey(Integer.valueOf(Integer.MAX_VALUE - 1001)));
+    assertTrue(!map.containsKey(Integer.valueOf(Integer.MAX_VALUE - 1002)));
+
+    db.close();
+  }
+
+  public void testStrings() throws IOException {
+    long init = Long.MAX_VALUE - size * 2;
+    String file = newTestFile();
+    DB db = new DBStore(file, false, false, false);
+    SortedMap<String, String> map = db.createTreeMap("aa");
+    for (long i = init; i < init + size / 10; i++) {
+      map.put("aaaaa" + i, "");
+    }
+    db.commit();
+    db.defrag(true);
+    db.close();
+    db = new DBStore(file, false, false, false);
+    map = db.getTreeMap("aa");
+    for (long i = init; i < init + size / 10; i++) {
+      assertTrue(map.containsKey("aaaaa" + i));
+    }
+
+    long fileSize = new File(file + ".dbr.0").length() / 1024;
+    System.out.println("file size with Strings: " + fileSize);
+    assertTrue("file is too big, compression failed", fileSize < 120);
+  }
+
+}

Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+public class BTreeLeadingValuePackTest extends TestCase {
+
+  public static class ByteArraySource {
+    byte[] last = new byte[0];
+    Random r;
+
+    public ByteArraySource(long seed) {
+      r = new Random(seed);
+      r.nextBytes(last);
+    }
+
+    public byte[] getBytesWithCommonPrefix(int len, int common) {
+      if (common > last.length)
+        common = last.length;
+      if (common > len)
+        common = len;
+
+      byte[] out = new byte[len];
+      System.arraycopy(last, 0, out, 0, common);
+      byte[] xtra = new byte[len - common];
+      r.nextBytes(xtra);
+      System.arraycopy(xtra, 0, out, common, xtra.length);
+
+      last = out;
+      return out;
+    }
+
+  }
+
+  private void doCompressUncompressTestFor(byte[][] groups) throws IOException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+
+    // compress
+    for (int i = 0; i < groups.length; i++) {
+      BTreeNode.leadingValuePackWrite(dos, groups[i], i > 0 ? groups[i - 1]
+          : null, 0);
+    }
+
+    byte[] results = baos.toByteArray();
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(results);
+    DataInputStream dis = new DataInputStream(bais);
+
+    byte[] previous = null;
+    for (int i = 0; i < groups.length; i++) {
+      previous = BTreeNode.leadingValuePackRead(dis, previous, 0);
+      assertTrue(Arrays.equals(groups[i], previous));
+    }
+
+  }
+
+  private byte[][] getIncrementingGroups(int groupCount, long seed,
+      int lenInit, int comInit, int lenIncr, int comIncr) {
+    ByteArraySource bap = new ByteArraySource(seed);
+    byte[][] groups = new byte[groupCount][];
+    for (int i = 0; i < groupCount; i++) {
+      groups[i] = bap.getBytesWithCommonPrefix(lenInit, comInit);
+      lenInit += lenIncr;
+      comInit += comIncr;
+    }
+    return groups;
+  }
+
+  public void testCompDecompEqualLenEqualCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        5, // starting common bytes
+        0, // length increment
+        0 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompEqualLenIncrCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        5, // starting common bytes
+        0, // length increment
+        2 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompEqualLenDecrCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        40, // starting common bytes
+        0, // length increment
+        -2 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompIncrLenEqualCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        30, // starting byte array length
+        25, // starting common bytes
+        1, // length increment
+        0 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompDecrLenEqualCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        25, // starting common bytes
+        -1, // length increment
+        0 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompNoCommon() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        0, // starting common bytes
+        -1, // length increment
+        0 // common bytes increment
+    );
+
+    doCompressUncompressTestFor(groups);
+  }
+
+  public void testCompDecompNullGroups() throws IOException {
+    byte[][] groups = getIncrementingGroups(5, // number of groups
+        1000, // seed
+        50, // starting byte array length
+        25, // starting common bytes
+        -1, // length increment
+        0 // common bytes increment
+    );
+
+    groups[2] = null;
+    groups[4] = null;
+
+    doCompressUncompressTestFor(groups);
+  }
+
+}