You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/09/19 13:52:24 UTC
svn commit: r1387533 [6/10] - in /hama/trunk: ./ core/
core/src/main/java/org/apache/hama/bsp/ graph/
graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/
jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/
j...
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,1362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.Vector;
+
+/**
+ * Serialization util. It reduces serialized data size for most common java
+ * types.
+ * <p/>
+ * Common pattern is one byte header which identifies data type, then size is
+ * written (if required) and data.
+ * <p/>
+ * On unknown types normal java serialization is used
+ * <p/>
+ * <p/>
+ * Header byte values bellow 180 are reserved by author for future use. If you
+ * want to customize this class, use values over 180, to be compatible with
+ * future updates.
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public final class Serialization extends SerialClassInfo implements Serializer, SerializationHeader {
+
+ /**
+ * print statistics to STDOUT
+ */
+ static final boolean DEBUG = false;
+
+ static final String UTF8 = "UTF-8";
+
+ Serialization(DBAbstract db, long serialClassInfoRecid,
+ ArrayList<ClassInfo> info) throws IOException {
+ super(db, serialClassInfoRecid, info);
+ }
+
+ public Serialization() {
+ super(null, 0L, new ArrayList<ClassInfo>());
+ // Add java.lang.Object as registered class
+ registered.add(new ClassInfo(Object.class.getName(), new FieldInfo[] {},
+ false, false));
+ }
+
+ /**
+ * Serialize the object into a byte array.
+ */
+ public byte[] serialize(Object obj) throws IOException {
+ DataInputOutput ba = new DataInputOutput();
+
+ serialize(ba, obj);
+
+ return ba.toByteArray();
+ }
+
+ boolean isSerializable(Object obj) {
+ // TODO suboptimal code
+ try {
+ serialize(new DataOutputStream(new ByteArrayOutputStream()), obj);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public void serialize(final DataOutput out, final Object obj)
+ throws IOException {
+ serialize(out, obj, null);
+ }
+
+ public void serialize(final DataOutput out, final Object obj,
+ FastArrayList objectStack) throws IOException {
+
+ /** try to find object on stack if it exists */
+ if (objectStack != null) {
+ int indexInObjectStack = objectStack.identityIndexOf(obj);
+ if (indexInObjectStack != -1) {
+ // object was already serialized, just write reference to it and return
+ out.write(OBJECT_STACK);
+ LongPacker.packInt(out, indexInObjectStack);
+ return;
+ }
+ // add this object to objectStack
+ objectStack.add(obj);
+ }
+
+ final Class clazz = obj != null ? obj.getClass() : null;
+
+ /** first try to serialize object without initializing object stack */
+ if (obj == null) {
+ out.write(NULL);
+ return;
+ } else if (clazz == Boolean.class) {
+ if (((Boolean) obj).booleanValue())
+ out.write(BOOLEAN_TRUE);
+ else
+ out.write(BOOLEAN_FALSE);
+ return;
+ } else if (clazz == Integer.class) {
+ final int val = (Integer) obj;
+ writeInteger(out, val);
+ return;
+ } else if (clazz == Double.class) {
+ double v = (Double) obj;
+ if (v == -1d)
+ out.write(DOUBLE_MINUS_1);
+ else if (v == 0d)
+ out.write(DOUBLE_0);
+ else if (v == 1d)
+ out.write(DOUBLE_1);
+ else if (v >= 0 && v <= 255 && (int) v == v) {
+ out.write(DOUBLE_255);
+ out.write((int) v);
+ } else if (v >= Short.MIN_VALUE && v <= Short.MAX_VALUE && (short) v == v) {
+ out.write(DOUBLE_SHORT);
+ out.writeShort((int) v);
+ } else {
+ out.write(DOUBLE_FULL);
+ out.writeDouble(v);
+ }
+ return;
+ } else if (clazz == Float.class) {
+ float v = (Float) obj;
+ if (v == -1f)
+ out.write(FLOAT_MINUS_1);
+ else if (v == 0f)
+ out.write(FLOAT_0);
+ else if (v == 1f)
+ out.write(FLOAT_1);
+ else if (v >= 0 && v <= 255 && (int) v == v) {
+ out.write(FLOAT_255);
+ out.write((int) v);
+ } else if (v >= Short.MIN_VALUE && v <= Short.MAX_VALUE && (short) v == v) {
+ out.write(FLOAT_SHORT);
+ out.writeShort((int) v);
+
+ } else {
+ out.write(FLOAT_FULL);
+ out.writeFloat(v);
+ }
+ return;
+ } else if (clazz == BigInteger.class) {
+ out.write(BIGINTEGER);
+ byte[] buf = ((BigInteger) obj).toByteArray();
+ serializeByteArrayInt(out, buf);
+ return;
+ } else if (clazz == BigDecimal.class) {
+ out.write(BIGDECIMAL);
+ BigDecimal d = (BigDecimal) obj;
+ serializeByteArrayInt(out, d.unscaledValue().toByteArray());
+ LongPacker.packInt(out, d.scale());
+ return;
+ } else if (clazz == Long.class) {
+ final long val = (Long) obj;
+ writeLong(out, val);
+ return;
+ } else if (clazz == Short.class) {
+ short val = (Short) obj;
+ if (val == -1)
+ out.write(SHORT_MINUS_1);
+ else if (val == 0)
+ out.write(SHORT_0);
+ else if (val == 1)
+ out.write(SHORT_1);
+ else if (val > 0 && val < 255) {
+ out.write(SHORT_255);
+ out.write(val);
+ } else {
+ out.write(SHORT_FULL);
+ out.writeShort(val);
+ }
+ return;
+ } else if (clazz == Byte.class) {
+ byte val = (Byte) obj;
+ if (val == -1)
+ out.write(BYTE_MINUS_1);
+ else if (val == 0)
+ out.write(BYTE_0);
+ else if (val == 1)
+ out.write(BYTE_1);
+ else {
+ out.write(SHORT_FULL);
+ out.writeByte(val);
+ }
+ return;
+ } else if (clazz == Character.class) {
+ out.write(CHAR);
+ out.writeChar((Character) obj);
+ return;
+ } else if (clazz == String.class) {
+ String s = (String) obj;
+ if (s.length() == 0) {
+ out.write(STRING_EMPTY);
+ } else {
+ out.write(STRING);
+ serializeString(out, s);
+ }
+ return;
+ } else if (obj instanceof Class) {
+ out.write(CLASS);
+ serialize(out, ((Class) obj).getName());
+ return;
+ } else if (obj instanceof int[]) {
+ writeIntArray(out, (int[]) obj);
+ return;
+ } else if (obj instanceof long[]) {
+ writeLongArray(out, (long[]) obj);
+ return;
+ } else if (obj instanceof short[]) {
+ out.write(SHORT_ARRAY);
+ short[] a = (short[]) obj;
+ LongPacker.packInt(out, a.length);
+ for (short s : a)
+ out.writeShort(s);
+ return;
+ } else if (obj instanceof boolean[]) {
+ out.write(BOOLEAN_ARRAY);
+ boolean[] a = (boolean[]) obj;
+ LongPacker.packInt(out, a.length);
+ for (boolean s : a)
+ out.writeBoolean(s); // TODO pack 8 booleans to single byte
+ return;
+ } else if (obj instanceof double[]) {
+ out.write(DOUBLE_ARRAY);
+ double[] a = (double[]) obj;
+ LongPacker.packInt(out, a.length);
+ for (double s : a)
+ out.writeDouble(s);
+ return;
+ } else if (obj instanceof float[]) {
+ out.write(FLOAT_ARRAY);
+ float[] a = (float[]) obj;
+ LongPacker.packInt(out, a.length);
+ for (float s : a)
+ out.writeFloat(s);
+ return;
+ } else if (obj instanceof char[]) {
+ out.write(CHAR_ARRAY);
+ char[] a = (char[]) obj;
+ LongPacker.packInt(out, a.length);
+ for (char s : a)
+ out.writeChar(s);
+ return;
+ } else if (obj instanceof byte[]) {
+ byte[] b = (byte[]) obj;
+ out.write(ARRAY_BYTE_INT);
+ serializeByteArrayInt(out, b);
+ return;
+ } else if (clazz == Date.class) {
+ out.write(DATE);
+ out.writeLong(((Date) obj).getTime());
+ return;
+ } else if (clazz == UUID.class) {
+ out.write(UUID);
+ serializeUUID(out, (UUID) obj);
+ return;
+ } else if (clazz == BTree.class) {
+ out.write(BTREE);
+ ((BTree) obj).writeExternal(out);
+ return;
+ } else if (clazz == HTree.class) {
+ out.write(HTREE);
+ ((HTree) obj).serialize(out);
+ return;
+ } else if (clazz == LinkedList.class) {
+ out.write(JDBMLINKEDLIST);
+ ((LinkedList) obj).serialize(out);
+ return;
+ }
+
+ /**
+ * classes bellow need object stack, so initialize it if not alredy
+ * initialized
+ */
+ if (objectStack == null) {
+ objectStack = new FastArrayList();
+ objectStack.add(obj);
+ }
+
+ if (obj instanceof Object[]) {
+ Object[] b = (Object[]) obj;
+ boolean packableLongs = b.length <= 255;
+ if (packableLongs) {
+ // check if it contains packable longs
+ for (Object o : b) {
+ if (o != null
+ && (o.getClass() != Long.class || (((Long) o).longValue() < 0 && ((Long) o)
+ .longValue() != Long.MAX_VALUE))) {
+ packableLongs = false;
+ break;
+ }
+ }
+ }
+
+ if (packableLongs) {
+ // packable Longs is special case, it is often used in JDBM to reference
+ // fields
+ out.write(ARRAY_OBJECT_PACKED_LONG);
+ out.write(b.length);
+ for (Object o : b) {
+ if (o == null)
+ LongPacker.packLong(out, 0);
+ else
+ LongPacker.packLong(out, ((Long) o).longValue() + 1);
+ }
+
+ } else {
+ out.write(ARRAY_OBJECT);
+ LongPacker.packInt(out, b.length);
+
+ // Write class id for components
+ Class<?> componentType = obj.getClass().getComponentType();
+ registerClass(componentType);
+ // write class header
+ int classId = getClassId(componentType);
+ LongPacker.packInt(out, classId);
+
+ for (Object o : b)
+ serialize(out, o, objectStack);
+
+ }
+
+ } else if (clazz == ArrayList.class) {
+ ArrayList l = (ArrayList) obj;
+ boolean packableLongs = l.size() < 255;
+ if (packableLongs) {
+ // packable Longs is special case, it is often used in JDBM to reference
+ // fields
+ for (Object o : l) {
+ if (o != null
+ && (o.getClass() != Long.class || (((Long) o).longValue() < 0 && ((Long) o)
+ .longValue() != Long.MAX_VALUE))) {
+ packableLongs = false;
+ break;
+ }
+ }
+ }
+ if (packableLongs) {
+ out.write(ARRAYLIST_PACKED_LONG);
+ out.write(l.size());
+ for (Object o : l) {
+ if (o == null)
+ LongPacker.packLong(out, 0);
+ else
+ LongPacker.packLong(out, ((Long) o).longValue() + 1);
+ }
+ } else {
+ serializeCollection(ARRAYLIST, out, obj, objectStack);
+ }
+
+ } else if (clazz == java.util.LinkedList.class) {
+ serializeCollection(LINKEDLIST, out, obj, objectStack);
+ } else if (clazz == Vector.class) {
+ serializeCollection(VECTOR, out, obj, objectStack);
+ } else if (clazz == TreeSet.class) {
+ TreeSet l = (TreeSet) obj;
+ out.write(TREESET);
+ LongPacker.packInt(out, l.size());
+ serialize(out, l.comparator(), objectStack);
+ for (Object o : l)
+ serialize(out, o, objectStack);
+ } else if (clazz == HashSet.class) {
+ serializeCollection(HASHSET, out, obj, objectStack);
+ } else if (clazz == LinkedHashSet.class) {
+ serializeCollection(LINKEDHASHSET, out, obj, objectStack);
+ } else if (clazz == TreeMap.class) {
+ TreeMap l = (TreeMap) obj;
+ out.write(TREEMAP);
+ LongPacker.packInt(out, l.size());
+ serialize(out, l.comparator(), objectStack);
+ for (Object o : l.keySet()) {
+ serialize(out, o, objectStack);
+ serialize(out, l.get(o), objectStack);
+ }
+ } else if (clazz == HashMap.class) {
+ serializeMap(HASHMAP, out, obj, objectStack);
+ } else if (clazz == IdentityHashMap.class) {
+ serializeMap(IDENTITYHASHMAP, out, obj, objectStack);
+ } else if (clazz == LinkedHashMap.class) {
+ serializeMap(LINKEDHASHMAP, out, obj, objectStack);
+ } else if (clazz == Hashtable.class) {
+ serializeMap(HASHTABLE, out, obj, objectStack);
+ } else if (clazz == Properties.class) {
+ serializeMap(PROPERTIES, out, obj, objectStack);
+ } else if (clazz == Locale.class) {
+ out.write(LOCALE);
+ Locale l = (Locale) obj;
+ out.writeUTF(l.getLanguage());
+ out.writeUTF(l.getCountry());
+ out.writeUTF(l.getVariant());
+ } else {
+ out.write(NORMAL);
+ writeObject(out, obj, objectStack);
+ }
+
+ }
+
+ static void serializeString(DataOutput out, String obj) throws IOException {
+ final int len = obj.length();
+ LongPacker.packInt(out, len);
+ for (int i = 0; i < len; i++) {
+ int c = (int) obj.charAt(i); // TODO investigate if c could be negative
+ // here
+ LongPacker.packInt(out, c);
+ }
+
+ }
+
+ private void serializeUUID(DataOutput out, UUID uuid) throws IOException {
+ out.writeLong(uuid.getMostSignificantBits());
+ out.writeLong(uuid.getLeastSignificantBits());
+ }
+
+ private void serializeMap(int header, DataOutput out, Object obj,
+ FastArrayList objectStack) throws IOException {
+ Map l = (Map) obj;
+ out.write(header);
+ LongPacker.packInt(out, l.size());
+ for (Object o : l.keySet()) {
+ serialize(out, o, objectStack);
+ serialize(out, l.get(o), objectStack);
+ }
+ }
+
+ private void serializeCollection(int header, DataOutput out, Object obj,
+ FastArrayList objectStack) throws IOException {
+ Collection l = (Collection) obj;
+ out.write(header);
+ LongPacker.packInt(out, l.size());
+
+ for (Object o : l)
+ serialize(out, o, objectStack);
+
+ }
+
+ private void serializeByteArrayInt(DataOutput out, byte[] b)
+ throws IOException {
+ LongPacker.packInt(out, b.length);
+ out.write(b);
+ }
+
+ private void writeLongArray(DataOutput da, long[] obj) throws IOException {
+ long max = Long.MIN_VALUE;
+ long min = Long.MAX_VALUE;
+ for (long i : obj) {
+ max = Math.max(max, i);
+ min = Math.min(min, i);
+ }
+
+ if (0 <= min && max <= 255) {
+ da.write(ARRAY_LONG_B);
+ LongPacker.packInt(da, obj.length);
+ for (long l : obj)
+ da.write((int) l);
+ } else if (0 <= min && max <= Long.MAX_VALUE) {
+ da.write(ARRAY_LONG_PACKED);
+ LongPacker.packInt(da, obj.length);
+ for (long l : obj)
+ LongPacker.packLong(da, l);
+ } else if (Short.MIN_VALUE <= min && max <= Short.MAX_VALUE) {
+ da.write(ARRAY_LONG_S);
+ LongPacker.packInt(da, obj.length);
+ for (long l : obj)
+ da.writeShort((short) l);
+ } else if (Integer.MIN_VALUE <= min && max <= Integer.MAX_VALUE) {
+ da.write(ARRAY_LONG_I);
+ LongPacker.packInt(da, obj.length);
+ for (long l : obj)
+ da.writeInt((int) l);
+ } else {
+ da.write(ARRAY_LONG_L);
+ LongPacker.packInt(da, obj.length);
+ for (long l : obj)
+ da.writeLong(l);
+ }
+
+ }
+
+ private void writeIntArray(DataOutput da, int[] obj) throws IOException {
+ int max = Integer.MIN_VALUE;
+ int min = Integer.MAX_VALUE;
+ for (int i : obj) {
+ max = Math.max(max, i);
+ min = Math.min(min, i);
+ }
+
+ boolean fitsInByte = 0 <= min && max <= 255;
+ boolean fitsInShort = Short.MIN_VALUE >= min && max <= Short.MAX_VALUE;
+
+ if (obj.length <= 255 && fitsInByte) {
+ da.write(ARRAY_INT_B_255);
+ da.write(obj.length);
+ for (int i : obj)
+ da.write(i);
+ } else if (fitsInByte) {
+ da.write(ARRAY_INT_B_INT);
+ LongPacker.packInt(da, obj.length);
+ for (int i : obj)
+ da.write(i);
+ } else if (0 <= min && max <= Integer.MAX_VALUE) {
+ da.write(ARRAY_INT_PACKED);
+ LongPacker.packInt(da, obj.length);
+ for (int l : obj)
+ LongPacker.packInt(da, l);
+ } else if (fitsInShort) {
+ da.write(ARRAY_INT_S);
+ LongPacker.packInt(da, obj.length);
+ for (int i : obj)
+ da.writeShort(i);
+ } else {
+ da.write(ARRAY_INT_I);
+ LongPacker.packInt(da, obj.length);
+ for (int i : obj)
+ da.writeInt(i);
+ }
+
+ }
+
+ private void writeInteger(DataOutput da, final int val) throws IOException {
+ if (val == -1)
+ da.write(INTEGER_MINUS_1);
+ else if (val == 0)
+ da.write(INTEGER_0);
+ else if (val == 1)
+ da.write(INTEGER_1);
+ else if (val == 2)
+ da.write(INTEGER_2);
+ else if (val == 3)
+ da.write(INTEGER_3);
+ else if (val == 4)
+ da.write(INTEGER_4);
+ else if (val == 5)
+ da.write(INTEGER_5);
+ else if (val == 6)
+ da.write(INTEGER_6);
+ else if (val == 7)
+ da.write(INTEGER_7);
+ else if (val == 8)
+ da.write(INTEGER_8);
+ else if (val == Integer.MIN_VALUE)
+ da.write(INTEGER_MINUS_MAX);
+ else if (val > 0 && val < 255) {
+ da.write(INTEGER_255);
+ da.write(val);
+ } else if (val < 0) {
+ da.write(INTEGER_PACK_NEG);
+ LongPacker.packInt(da, -val);
+ } else {
+ da.write(INTEGER_PACK);
+ LongPacker.packInt(da, val);
+ }
+ }
+
+ private void writeLong(DataOutput da, final long val) throws IOException {
+ if (val == -1)
+ da.write(LONG_MINUS_1);
+ else if (val == 0)
+ da.write(LONG_0);
+ else if (val == 1)
+ da.write(LONG_1);
+ else if (val == 2)
+ da.write(LONG_2);
+ else if (val == 3)
+ da.write(LONG_3);
+ else if (val == 4)
+ da.write(LONG_4);
+ else if (val == 5)
+ da.write(LONG_5);
+ else if (val == 6)
+ da.write(LONG_6);
+ else if (val == 7)
+ da.write(LONG_7);
+ else if (val == 8)
+ da.write(LONG_8);
+ else if (val == Long.MIN_VALUE)
+ da.write(LONG_MINUS_MAX);
+ else if (val > 0 && val < 255) {
+ da.write(LONG_255);
+ da.write((int) val);
+ } else if (val < 0) {
+ da.write(LONG_PACK_NEG);
+ LongPacker.packLong(da, -val);
+ } else {
+ da.write(LONG_PACK);
+ LongPacker.packLong(da, val);
+ }
+ }
+
+ /**
+ * Deserialize an object from a byte array
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public Object deserialize(byte[] buf) throws ClassNotFoundException,
+ IOException {
+ DataInputOutput bs = new DataInputOutput(buf);
+ Object ret = deserialize(bs);
+ if (bs.available() != 0)
+ throw new InternalError("bytes left: " + bs.available());
+
+ return ret;
+ }
+
+ static String deserializeString(DataInput buf) throws IOException {
+ int len = LongPacker.unpackInt(buf);
+ char[] b = new char[len];
+ for (int i = 0; i < len; i++)
+ b[i] = (char) LongPacker.unpackInt(buf);
+
+ return new String(b);
+ }
+
+ public Object deserialize(DataInput is) throws IOException,
+ ClassNotFoundException {
+ return deserialize(is, null);
+ }
+
+ public Object deserialize(DataInput is, FastArrayList objectStack)
+ throws IOException, ClassNotFoundException {
+
+ Object ret = null;
+
+ final int head = is.readUnsignedByte();
+
+ /** first try to deserialize object without allocating object stack */
+ switch (head) {
+ case NULL:
+ break;
+ case BOOLEAN_TRUE:
+ ret = Boolean.TRUE;
+ break;
+ case BOOLEAN_FALSE:
+ ret = Boolean.FALSE;
+ break;
+ case INTEGER_MINUS_1:
+ ret = Integer.valueOf(-1);
+ break;
+ case INTEGER_0:
+ ret = Integer.valueOf(0);
+ break;
+ case INTEGER_1:
+ ret = Integer.valueOf(1);
+ break;
+ case INTEGER_2:
+ ret = Integer.valueOf(2);
+ break;
+ case INTEGER_3:
+ ret = Integer.valueOf(3);
+ break;
+ case INTEGER_4:
+ ret = Integer.valueOf(4);
+ break;
+ case INTEGER_5:
+ ret = Integer.valueOf(5);
+ break;
+ case INTEGER_6:
+ ret = Integer.valueOf(6);
+ break;
+ case INTEGER_7:
+ ret = Integer.valueOf(7);
+ break;
+ case INTEGER_8:
+ ret = Integer.valueOf(8);
+ break;
+ case INTEGER_MINUS_MAX:
+ ret = Integer.valueOf(Integer.MIN_VALUE);
+ break;
+ case INTEGER_255:
+ ret = Integer.valueOf(is.readUnsignedByte());
+ break;
+ case INTEGER_PACK_NEG:
+ ret = Integer.valueOf(-LongPacker.unpackInt(is));
+ break;
+ case INTEGER_PACK:
+ ret = Integer.valueOf(LongPacker.unpackInt(is));
+ break;
+ case LONG_MINUS_1:
+ ret = Long.valueOf(-1);
+ break;
+ case LONG_0:
+ ret = Long.valueOf(0);
+ break;
+ case LONG_1:
+ ret = Long.valueOf(1);
+ break;
+ case LONG_2:
+ ret = Long.valueOf(2);
+ break;
+ case LONG_3:
+ ret = Long.valueOf(3);
+ break;
+ case LONG_4:
+ ret = Long.valueOf(4);
+ break;
+ case LONG_5:
+ ret = Long.valueOf(5);
+ break;
+ case LONG_6:
+ ret = Long.valueOf(6);
+ break;
+ case LONG_7:
+ ret = Long.valueOf(7);
+ break;
+ case LONG_8:
+ ret = Long.valueOf(8);
+ break;
+ case LONG_255:
+ ret = Long.valueOf(is.readUnsignedByte());
+ break;
+ case LONG_PACK_NEG:
+ ret = Long.valueOf(-LongPacker.unpackLong(is));
+ break;
+ case LONG_PACK:
+ ret = Long.valueOf(LongPacker.unpackLong(is));
+ break;
+ case LONG_MINUS_MAX:
+ ret = Long.valueOf(Long.MIN_VALUE);
+ break;
+ case SHORT_MINUS_1:
+ ret = Short.valueOf((short) -1);
+ break;
+ case SHORT_0:
+ ret = Short.valueOf((short) 0);
+ break;
+ case SHORT_1:
+ ret = Short.valueOf((short) 1);
+ break;
+ case SHORT_255:
+ ret = Short.valueOf((short) is.readUnsignedByte());
+ break;
+ case SHORT_FULL:
+ ret = Short.valueOf(is.readShort());
+ break;
+ case BYTE_MINUS_1:
+ ret = Byte.valueOf((byte) -1);
+ break;
+ case BYTE_0:
+ ret = Byte.valueOf((byte) 0);
+ break;
+ case BYTE_1:
+ ret = Byte.valueOf((byte) 1);
+ break;
+ case BYTE_FULL:
+ ret = Byte.valueOf(is.readByte());
+ break;
+ case SHORT_ARRAY:
+ int size = LongPacker.unpackInt(is);
+ ret = new short[size];
+ for (int i = 0; i < size; i++)
+ ((short[]) ret)[i] = is.readShort();
+ break;
+ case BOOLEAN_ARRAY:
+ size = LongPacker.unpackInt(is);
+ ret = new boolean[size];
+ for (int i = 0; i < size; i++)
+ ((boolean[]) ret)[i] = is.readBoolean();
+ break;
+ case DOUBLE_ARRAY:
+ size = LongPacker.unpackInt(is);
+ ret = new double[size];
+ for (int i = 0; i < size; i++)
+ ((double[]) ret)[i] = is.readDouble();
+ break;
+ case FLOAT_ARRAY:
+ size = LongPacker.unpackInt(is);
+ ret = new float[size];
+ for (int i = 0; i < size; i++)
+ ((float[]) ret)[i] = is.readFloat();
+ break;
+ case CHAR_ARRAY:
+ size = LongPacker.unpackInt(is);
+ ret = new char[size];
+ for (int i = 0; i < size; i++)
+ ((char[]) ret)[i] = is.readChar();
+ break;
+ case CHAR:
+ ret = Character.valueOf(is.readChar());
+ break;
+ case FLOAT_MINUS_1:
+ ret = Float.valueOf(-1);
+ break;
+ case FLOAT_0:
+ ret = Float.valueOf(0);
+ break;
+ case FLOAT_1:
+ ret = Float.valueOf(1);
+ break;
+ case FLOAT_255:
+ ret = Float.valueOf(is.readUnsignedByte());
+ break;
+ case FLOAT_SHORT:
+ ret = Float.valueOf(is.readShort());
+ break;
+ case FLOAT_FULL:
+ ret = Float.valueOf(is.readFloat());
+ break;
+ case DOUBLE_MINUS_1:
+ ret = Double.valueOf(-1);
+ break;
+ case DOUBLE_0:
+ ret = Double.valueOf(0);
+ break;
+ case DOUBLE_1:
+ ret = Double.valueOf(1);
+ break;
+ case DOUBLE_255:
+ ret = Double.valueOf(is.readUnsignedByte());
+ break;
+ case DOUBLE_SHORT:
+ ret = Double.valueOf(is.readShort());
+ break;
+ case DOUBLE_FULL:
+ ret = Double.valueOf(is.readDouble());
+ break;
+ case BIGINTEGER:
+ ret = new BigInteger(deserializeArrayByteInt(is));
+ break;
+ case BIGDECIMAL:
+ ret = new BigDecimal(new BigInteger(deserializeArrayByteInt(is)),
+ LongPacker.unpackInt(is));
+ break;
+ case STRING:
+ ret = deserializeString(is);
+ break;
+ case STRING_EMPTY:
+ ret = JDBMUtils.EMPTY_STRING;
+ break;
+
+ case CLASS:
+ ret = deserializeClass(is);
+ break;
+ case DATE:
+ ret = new Date(is.readLong());
+ break;
+ case UUID:
+ ret = deserializeUUID(is);
+ break;
+ case ARRAY_INT_B_255:
+ ret = deserializeArrayIntB255(is);
+ break;
+ case ARRAY_INT_B_INT:
+ ret = deserializeArrayIntBInt(is);
+ break;
+ case ARRAY_INT_S:
+ ret = deserializeArrayIntSInt(is);
+ break;
+ case ARRAY_INT_I:
+ ret = deserializeArrayIntIInt(is);
+ break;
+ case ARRAY_INT_PACKED:
+ ret = deserializeArrayIntPack(is);
+ break;
+ case ARRAY_LONG_B:
+ ret = deserializeArrayLongB(is);
+ break;
+ case ARRAY_LONG_S:
+ ret = deserializeArrayLongS(is);
+ break;
+ case ARRAY_LONG_I:
+ ret = deserializeArrayLongI(is);
+ break;
+ case ARRAY_LONG_L:
+ ret = deserializeArrayLongL(is);
+ break;
+ case ARRAY_LONG_PACKED:
+ ret = deserializeArrayLongPack(is);
+ break;
+ case ARRAYLIST_PACKED_LONG:
+ ret = deserializeArrayListPackedLong(is);
+ break;
+ case ARRAY_BYTE_INT:
+ ret = deserializeArrayByteInt(is);
+ break;
+ case LOCALE:
+ ret = new Locale(is.readUTF(), is.readUTF(), is.readUTF());
+ break;
+ case JDBMLINKEDLIST:
+ ret = LinkedList.deserialize(is, this);
+ break;
+ case HTREE:
+ ret = HTree.deserialize(is, this);
+ break;
+ case BTREE:
+ ret = BTree.readExternal(is, this);
+ break;
+ case BTREE_NODE_LEAF:
+ throw new InternalError("BPage header, wrong serializer used");
+ case BTREE_NODE_NONLEAF:
+ throw new InternalError("BPage header, wrong serializer used");
+ case JAVA_SERIALIZATION:
+ throw new InternalError(
+ "Wrong header, data were probably serialized with OutputStream, not with JDBM serialization");
+
+ case -1:
+ throw new EOFException();
+
+ }
+
+ if (ret != null || head == NULL) {
+ if (objectStack != null)
+ objectStack.add(ret);
+ return ret;
+ }
+
+ /** something else which needs object stack initialized */
+
+ if (objectStack == null)
+ objectStack = new FastArrayList();
+ int oldObjectStackSize = objectStack.size();
+
+ switch (head) {
+ case NORMAL:
+ ret = readObject(is, objectStack);
+ break;
+ case OBJECT_STACK:
+ ret = objectStack.get(LongPacker.unpackInt(is));
+ break;
+ case ARRAYLIST:
+ ret = deserializeArrayList(is, objectStack);
+ break;
+ case ARRAY_OBJECT:
+ ret = deserializeArrayObject(is, objectStack);
+ break;
+ case ARRAY_OBJECT_PACKED_LONG:
+ ret = deserializeArrayObjectPackedLong(is);
+ break;
+ case LINKEDLIST:
+ ret = deserializeLinkedList(is, objectStack);
+ break;
+ case TREESET:
+ ret = deserializeTreeSet(is, objectStack);
+ break;
+ case HASHSET:
+ ret = deserializeHashSet(is, objectStack);
+ break;
+ case LINKEDHASHSET:
+ ret = deserializeLinkedHashSet(is, objectStack);
+ break;
+ case VECTOR:
+ ret = deserializeVector(is, objectStack);
+ break;
+ case TREEMAP:
+ ret = deserializeTreeMap(is, objectStack);
+ break;
+ case HASHMAP:
+ ret = deserializeHashMap(is, objectStack);
+ break;
+ case IDENTITYHASHMAP:
+ ret = deserializeIdentityHashMap(is, objectStack);
+ break;
+ case LINKEDHASHMAP:
+ ret = deserializeLinkedHashMap(is, objectStack);
+ break;
+ case HASHTABLE:
+ ret = deserializeHashtable(is, objectStack);
+ break;
+ case PROPERTIES:
+ ret = deserializeProperties(is, objectStack);
+ break;
+
+ default:
+ throw new InternalError("Unknown serialization header: " + head);
+ }
+
+ if (head != OBJECT_STACK && objectStack.size() == oldObjectStackSize) {
+ // check if object was not already added to stack as part of collection
+ objectStack.add(ret);
+ }
+
+ return ret;
+ }
+
+ private Class deserializeClass(DataInput is) throws IOException,
+ ClassNotFoundException {
+ String className = (String) deserialize(is);
+ Class cls = Class.forName(className);
+ return cls;
+ }
+
+ private byte[] deserializeArrayByteInt(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ byte[] b = new byte[size];
+ is.readFully(b);
+ return b;
+ }
+
+ private long[] deserializeArrayLongL(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ long[] ret = new long[size];
+ for (int i = 0; i < size; i++)
+ ret[i] = is.readLong();
+ return ret;
+ }
+
+ private long[] deserializeArrayLongI(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ long[] ret = new long[size];
+ for (int i = 0; i < size; i++)
+ ret[i] = is.readInt();
+ return ret;
+ }
+
+ private long[] deserializeArrayLongS(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ long[] ret = new long[size];
+ for (int i = 0; i < size; i++)
+ ret[i] = is.readShort();
+ return ret;
+ }
+
+ private long[] deserializeArrayLongB(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ long[] ret = new long[size];
+ for (int i = 0; i < size; i++) {
+ ret[i] = is.readUnsignedByte();
+ if (ret[i] < 0)
+ throw new EOFException();
+ }
+ return ret;
+ }
+
+ private int[] deserializeArrayIntIInt(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ int[] ret = new int[size];
+ for (int i = 0; i < size; i++)
+ ret[i] = is.readInt();
+ return ret;
+ }
+
+ private int[] deserializeArrayIntSInt(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ int[] ret = new int[size];
+ for (int i = 0; i < size; i++)
+ ret[i] = is.readShort();
+ return ret;
+ }
+
+ private int[] deserializeArrayIntBInt(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ int[] ret = new int[size];
+ for (int i = 0; i < size; i++) {
+ ret[i] = is.readUnsignedByte();
+ if (ret[i] < 0)
+ throw new EOFException();
+ }
+ return ret;
+ }
+
+ private int[] deserializeArrayIntPack(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ if (size < 0)
+ throw new EOFException();
+
+ int[] ret = new int[size];
+ for (int i = 0; i < size; i++) {
+ ret[i] = LongPacker.unpackInt(is);
+ }
+ return ret;
+ }
+
+ private long[] deserializeArrayLongPack(DataInput is) throws IOException {
+ int size = LongPacker.unpackInt(is);
+ if (size < 0)
+ throw new EOFException();
+
+ long[] ret = new long[size];
+ for (int i = 0; i < size; i++) {
+ ret[i] = LongPacker.unpackLong(is);
+ }
+ return ret;
+ }
+
+ private UUID deserializeUUID(DataInput is) throws IOException {
+ return new UUID(is.readLong(), is.readLong());
+ }
+
+ private int[] deserializeArrayIntB255(DataInput is) throws IOException {
+ int size = is.readUnsignedByte();
+ if (size < 0)
+ throw new EOFException();
+
+ int[] ret = new int[size];
+ for (int i = 0; i < size; i++) {
+ ret[i] = is.readUnsignedByte();
+ if (ret[i] < 0)
+ throw new EOFException();
+ }
+ return ret;
+ }
+
+ private Object[] deserializeArrayObject(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ // Read class id for components
+ int classId = LongPacker.unpackInt(is);
+ Class clazz = classId2class.get(classId);
+ if (clazz == null)
+ clazz = Object.class;
+
+ Object[] s = (Object[]) Array.newInstance(clazz, size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s[i] = deserialize(is, objectStack);
+ return s;
+ }
+
+ private Object[] deserializeArrayObjectPackedLong(DataInput is)
+ throws IOException, ClassNotFoundException {
+ int size = is.readUnsignedByte();
+ Object[] s = new Object[size];
+ for (int i = 0; i < size; i++) {
+ long l = LongPacker.unpackLong(is);
+ if (l == 0)
+ s[i] = null;
+ else
+ s[i] = Long.valueOf(l - 1);
+ }
+ return s;
+ }
+
+ private ArrayList<Object> deserializeArrayList(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ ArrayList<Object> s = new ArrayList<Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++) {
+ s.add(deserialize(is, objectStack));
+ }
+ return s;
+ }
+
+ private ArrayList<Object> deserializeArrayListPackedLong(DataInput is)
+ throws IOException, ClassNotFoundException {
+ int size = is.readUnsignedByte();
+ if (size < 0)
+ throw new EOFException();
+
+ ArrayList<Object> s = new ArrayList<Object>(size);
+ for (int i = 0; i < size; i++) {
+ long l = LongPacker.unpackLong(is);
+ if (l == 0)
+ s.add(null);
+ else
+ s.add(Long.valueOf(l - 1));
+ }
+ return s;
+ }
+
+ private java.util.LinkedList deserializeLinkedList(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ java.util.LinkedList s = new java.util.LinkedList();
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.add(deserialize(is, objectStack));
+ return s;
+ }
+
+ private Vector<Object> deserializeVector(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ Vector<Object> s = new Vector<Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.add(deserialize(is, objectStack));
+ return s;
+ }
+
+ private HashSet<Object> deserializeHashSet(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ HashSet<Object> s = new HashSet<Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.add(deserialize(is, objectStack));
+ return s;
+ }
+
+ private LinkedHashSet<Object> deserializeLinkedHashSet(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ LinkedHashSet<Object> s = new LinkedHashSet<Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.add(deserialize(is, objectStack));
+ return s;
+ }
+
+ private TreeSet<Object> deserializeTreeSet(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+ TreeSet<Object> s = new TreeSet<Object>();
+ objectStack.add(s);
+ Comparator comparator = (Comparator) deserialize(is, objectStack);
+ if (comparator != null)
+ s = new TreeSet<Object>(comparator);
+
+ for (int i = 0; i < size; i++)
+ s.add(deserialize(is, objectStack));
+ return s;
+ }
+
+ private TreeMap<Object, Object> deserializeTreeMap(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ TreeMap<Object, Object> s = new TreeMap<Object, Object>();
+ objectStack.add(s);
+ Comparator comparator = (Comparator) deserialize(is, objectStack);
+ if (comparator != null)
+ s = new TreeMap<Object, Object>(comparator);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ private HashMap<Object, Object> deserializeHashMap(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ HashMap<Object, Object> s = new HashMap<Object, Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ private IdentityHashMap<Object, Object> deserializeIdentityHashMap(
+ DataInput is, FastArrayList objectStack) throws IOException,
+ ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ IdentityHashMap<Object, Object> s = new IdentityHashMap<Object, Object>(
+ size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ private LinkedHashMap<Object, Object> deserializeLinkedHashMap(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ LinkedHashMap<Object, Object> s = new LinkedHashMap<Object, Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ private Hashtable<Object, Object> deserializeHashtable(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ Hashtable<Object, Object> s = new Hashtable<Object, Object>(size);
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ private Properties deserializeProperties(DataInput is,
+ FastArrayList objectStack) throws IOException, ClassNotFoundException {
+ int size = LongPacker.unpackInt(is);
+
+ Properties s = new Properties();
+ objectStack.add(s);
+ for (int i = 0; i < size; i++)
+ s.put(deserialize(is, objectStack), deserialize(is, objectStack));
+ return s;
+ }
+
+ /**
+ * Utility class similar to ArrayList, but with fast identity search.
+ */
+ static class FastArrayList<K> {
+
+ private int size = 0;
+ private K[] elementData = (K[]) new Object[8];
+
+ K get(int index) {
+ if (index >= size)
+ throw new IndexOutOfBoundsException();
+ return elementData[index];
+ }
+
+ void add(K o) {
+ if (elementData.length == size) {
+ // grow array if necessary
+ elementData = Arrays.copyOf(elementData, elementData.length * 2);
+ }
+
+ elementData[size] = o;
+ size++;
+ }
+
+ int size() {
+ return size;
+ }
+
+ /**
+ * This method is reason why ArrayList is not used. Search an item in list
+ * and returns its index. It uses identity rather than 'equalsTo' One could
+ * argue that TreeMap should be used instead, but we do not expect large
+ * object trees. This search is VERY FAST compared to Maps, it does not
+ * allocate new instances or uses method calls.
+ *
+ * @param obj
+ * @return index of object in list or -1 if not found
+ */
+ int identityIndexOf(Object obj) {
+ for (int i = 0; i < size; i++) {
+ if (obj == elementData[i])
+ return i;
+ }
+ return -1;
+ }
+
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+/**
+ * Header byte, is used at start of each record to indicate data type WARNING
+ * !!! values below must be unique !!!!!
+ */
+public interface SerializationHeader {
+
+ final static int NULL = 0;
+ final static int NORMAL = 1;
+ final static int BOOLEAN_TRUE = 2;
+ final static int BOOLEAN_FALSE = 3;
+ final static int INTEGER_MINUS_1 = 4;
+ final static int INTEGER_0 = 5;
+ final static int INTEGER_1 = 6;
+ final static int INTEGER_2 = 7;
+ final static int INTEGER_3 = 8;
+ final static int INTEGER_4 = 9;
+ final static int INTEGER_5 = 10;
+ final static int INTEGER_6 = 11;
+ final static int INTEGER_7 = 12;
+ final static int INTEGER_8 = 13;
+ final static int INTEGER_255 = 14;
+ final static int INTEGER_PACK_NEG = 15;
+ final static int INTEGER_PACK = 16;
+ final static int LONG_MINUS_1 = 17;
+ final static int LONG_0 = 18;
+ final static int LONG_1 = 19;
+ final static int LONG_2 = 20;
+ final static int LONG_3 = 21;
+ final static int LONG_4 = 22;
+ final static int LONG_5 = 23;
+ final static int LONG_6 = 24;
+ final static int LONG_7 = 25;
+ final static int LONG_8 = 26;
+ final static int LONG_PACK_NEG = 27;
+ final static int LONG_PACK = 28;
+ final static int LONG_255 = 29;
+ final static int LONG_MINUS_MAX = 30;
+ final static int SHORT_MINUS_1 = 31;
+ final static int SHORT_0 = 32;
+ final static int SHORT_1 = 33;
+ final static int SHORT_255 = 34;
+ final static int SHORT_FULL = 35;
+ final static int BYTE_MINUS_1 = 36;
+ final static int BYTE_0 = 37;
+ final static int BYTE_1 = 38;
+ final static int BYTE_FULL = 39;
+ final static int CHAR = 40;
+ final static int FLOAT_MINUS_1 = 41;
+ final static int FLOAT_0 = 42;
+ final static int FLOAT_1 = 43;
+ final static int FLOAT_255 = 44;
+ final static int FLOAT_SHORT = 45;
+ final static int FLOAT_FULL = 46;
+ final static int DOUBLE_MINUS_1 = 47;
+ final static int DOUBLE_0 = 48;
+ final static int DOUBLE_1 = 49;
+ final static int DOUBLE_255 = 50;
+ final static int DOUBLE_SHORT = 51;
+ final static int DOUBLE_FULL = 52;
+ final static int DOUBLE_ARRAY = 53;
+ final static int BIGDECIMAL = 54;
+ final static int BIGINTEGER = 55;
+ final static int FLOAT_ARRAY = 56;
+ final static int INTEGER_MINUS_MAX = 57;
+ final static int SHORT_ARRAY = 58;
+ final static int BOOLEAN_ARRAY = 59;
+
+ final static int ARRAY_INT_B_255 = 60;
+ final static int ARRAY_INT_B_INT = 61;
+ final static int ARRAY_INT_S = 62;
+ final static int ARRAY_INT_I = 63;
+ final static int ARRAY_INT_PACKED = 64;
+
+ final static int ARRAY_LONG_B = 65;
+ final static int ARRAY_LONG_S = 66;
+ final static int ARRAY_LONG_I = 67;
+ final static int ARRAY_LONG_L = 68;
+ final static int ARRAY_LONG_PACKED = 69;
+
+ final static int CHAR_ARRAY = 70;
+ final static int ARRAY_BYTE_INT = 71;
+
+ final static int NOTUSED_ARRAY_OBJECT_255 = 72;
+ final static int ARRAY_OBJECT = 73;
+ // special cases for BTree values which stores references
+ final static int ARRAY_OBJECT_PACKED_LONG = 74;
+ final static int ARRAYLIST_PACKED_LONG = 75;
+
+ final static int STRING_EMPTY = 101;
+ final static int NOTUSED_STRING_255 = 102;
+ final static int STRING = 103;
+ final static int NOTUSED_ARRAYLIST_255 = 104;
+ final static int ARRAYLIST = 105;
+
+ final static int NOTUSED_TREEMAP_255 = 106;
+ final static int TREEMAP = 107;
+ final static int NOTUSED_HASHMAP_255 = 108;
+ final static int HASHMAP = 109;
+ final static int NOTUSED_LINKEDHASHMAP_255 = 110;
+ final static int LINKEDHASHMAP = 111;
+
+ final static int NOTUSED_TREESET_255 = 112;
+ final static int TREESET = 113;
+ final static int NOTUSED_HASHSET_255 = 114;
+ final static int HASHSET = 115;
+ final static int NOTUSED_LINKEDHASHSET_255 = 116;
+ final static int LINKEDHASHSET = 117;
+ final static int NOTUSED_LINKEDLIST_255 = 118;
+ final static int LINKEDLIST = 119;
+
+ final static int NOTUSED_VECTOR_255 = 120;
+ final static int VECTOR = 121;
+ final static int IDENTITYHASHMAP = 122;
+ final static int HASHTABLE = 123;
+ final static int LOCALE = 124;
+ final static int PROPERTIES = 125;
+
+ final static int CLASS = 126;
+ final static int DATE = 127;
+ final static int UUID = 128;
+
+ static final int JDBMLINKEDLIST = 159;
+ static final int HTREE = 160;
+
+ final static int BTREE = 161;
+
+ static final int BTREE_NODE_LEAF = 162;
+ static final int BTREE_NODE_NONLEAF = 163;
+ static final int HTREE_BUCKET = 164;
+ static final int HTREE_DIRECTORY = 165;
+ /**
+ * used for reference to already serialized object in object graph
+ */
+ static final int OBJECT_STACK = 166;
+ static final int JAVA_SERIALIZATION = 172;
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Interface used to provide a serialization mechanism other than a class'
+ * normal serialization.
+ */
+public interface Serializer<A> {
+
+ /**
+ * Serialize the content of an object into a byte array.
+ *
+ * @param out ObjectOutput to save object into
+ * @param obj Object to serialize
+ */
+ public void serialize(DataOutput out, A obj) throws IOException;
+
+ /**
+ * Deserialize the content of an object from a byte array.
+ *
+ * @param in to read serialized data from
+ * @return deserialized object
+ * @throws IOException
+ * @throws ClassNotFoundException
+ */
+ public A deserialize(DataInput in) throws IOException, ClassNotFoundException;
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public interface Storage {
+
+ /**
+ * Bite shift used to calculate page size. If you want to modify page size, do
+ * it here.
+ *
+ * 1<<9 = 512 1<<10 = 1024 1<<11 = 2048 1<<12 = 4096
+ */
+ int PAGE_SIZE_SHIFT = 12;
+
+ /**
+ * the lenght of single page.
+ * <p>
+ * !!! DO NOT MODIFY THI DIRECTLY !!!
+ */
+ int PAGE_SIZE = 1 << PAGE_SIZE_SHIFT;
+
+ /**
+ * use 'val & OFFSET_MASK' to quickly get offset within the page;
+ */
+ long OFFSET_MASK = 0xFFFFFFFFFFFFFFFFL >>> (64 - Storage.PAGE_SIZE_SHIFT);
+
+ void write(long pageNumber, ByteBuffer data) throws IOException;
+
+ ByteBuffer read(long pageNumber) throws IOException;
+
+ void forceClose() throws IOException;
+
+ boolean isReadonly();
+
+ DataInputStream readTransactionLog();
+
+ void deleteTransactionLog();
+
+ void sync() throws IOException;
+
+ DataOutputStream openTransactionLog() throws IOException;
+
+ void deleteAllFiles() throws IOException;
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Storage which used files on disk to store data
+ */
+public final class StorageDisk implements Storage {
+
+ private ArrayList<RandomAccessFile> rafs = new ArrayList<RandomAccessFile>();
+ private ArrayList<RandomAccessFile> rafsTranslation = new ArrayList<RandomAccessFile>();
+
+ private String fileName;
+
+ private boolean readonly;
+
+ public StorageDisk(String fileName, boolean readonly, boolean lockingDisabled)
+ throws IOException {
+ this.fileName = fileName;
+ this.readonly = readonly;
+ // make sure first file can be opened
+ // lock it
+ try {
+ if (!readonly && !lockingDisabled)
+ getRaf(0).getChannel().tryLock();
+ } catch (IOException e) {
+ throw new IOException("Could not lock DB file: " + fileName, e);
+ } catch (OverlappingFileLockException e) {
+ throw new IOException("Could not lock DB file: " + fileName, e);
+ }
+
+ }
+
+ RandomAccessFile getRaf(long pageNumber) throws IOException {
+
+ int fileNumber = (int) (Math.abs(pageNumber) / StorageDiskMapped.PAGES_PER_FILE);
+
+ List<RandomAccessFile> c = pageNumber >= 0 ? rafs : rafsTranslation;
+
+ // increase capacity of array lists if needed
+ for (int i = c.size(); i <= fileNumber; i++) {
+ c.add(null);
+ }
+
+ RandomAccessFile ret = c.get(fileNumber);
+ if (ret == null) {
+ String name = StorageDiskMapped.makeFileName(fileName, pageNumber,
+ fileNumber);
+ ret = new RandomAccessFile(name, readonly ? "r" : "rw");
+ c.set(fileNumber, ret);
+ }
+ return ret;
+
+ }
+
+ @Override
+ public void write(long pageNumber, ByteBuffer data) throws IOException {
+ if (data.capacity() != PAGE_SIZE)
+ throw new IllegalArgumentException();
+
+ long offset = pageNumber * PAGE_SIZE;
+
+ RandomAccessFile file = getRaf(pageNumber);
+
+ file.seek(Math.abs(offset % (StorageDiskMapped.PAGES_PER_FILE * PAGE_SIZE)));
+
+ file.write(data.array());
+ }
+
+ @Override
+ public ByteBuffer read(long pageNumber) throws IOException {
+
+ long offset = pageNumber * PAGE_SIZE;
+ ByteBuffer buffer = ByteBuffer.allocate(PAGE_SIZE);
+
+ RandomAccessFile file = getRaf(pageNumber);
+ file.seek(Math.abs(offset % (StorageDiskMapped.PAGES_PER_FILE * PAGE_SIZE)));
+ int remaining = buffer.limit();
+ int pos = 0;
+ while (remaining > 0) {
+ int read = file.read(buffer.array(), pos, remaining);
+ if (read == -1) {
+ System
+ .arraycopy(PageFile.CLEAN_DATA, 0, buffer.array(), pos, remaining);
+ break;
+ }
+ remaining -= read;
+ pos += read;
+ }
+ return buffer;
+ }
+
+ static final String transaction_log_file_extension = ".t";
+
+ @Override
+ public DataOutputStream openTransactionLog() throws IOException {
+ String logName = fileName + transaction_log_file_extension;
+ final FileOutputStream fileOut = new FileOutputStream(logName);
+ return new DataOutputStream(new BufferedOutputStream(fileOut)) {
+
+ // default implementation of flush on FileOutputStream does nothing,
+ // so we use little workaround to make sure that data were really flushed
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ fileOut.flush();
+ fileOut.getFD().sync();
+ }
+ };
+ }
+
+ @Override
+ public void deleteAllFiles() {
+ deleteTransactionLog();
+ StorageDiskMapped.deleteFiles(fileName);
+ }
+
+ /**
+ * Synchronizes the file.
+ */
+ @Override
+ public void sync() throws IOException {
+ for (RandomAccessFile file : rafs)
+ if (file != null)
+ file.getFD().sync();
+ for (RandomAccessFile file : rafsTranslation)
+ if (file != null)
+ file.getFD().sync();
+ }
+
+ @Override
+ public void forceClose() throws IOException {
+ for (RandomAccessFile f : rafs) {
+ if (f != null)
+ f.close();
+ }
+ rafs = null;
+ for (RandomAccessFile f : rafsTranslation) {
+ if (f != null)
+ f.close();
+ }
+ rafsTranslation = null;
+ }
+
+ @Override
+ public DataInputStream readTransactionLog() {
+
+ File logFile = new File(fileName + transaction_log_file_extension);
+ if (!logFile.exists())
+ return null;
+ if (logFile.length() == 0) {
+ logFile.delete();
+ return null;
+ }
+
+ DataInputStream ois = null;
+ try {
+ ois = new DataInputStream(new BufferedInputStream(new FileInputStream(
+ logFile)));
+ } catch (FileNotFoundException e) {
+ // file should exists, we check for its presents just a miliseconds
+ // yearlier, anyway move on
+ return null;
+ }
+
+ try {
+ if (ois.readShort() != Magic.LOGFILE_HEADER)
+ throw new Error("Bad magic on log file");
+ } catch (IOException e) {
+ // corrupted/empty logfile
+ logFile.delete();
+ return null;
+ }
+ return ois;
+ }
+
+ @Override
+ public void deleteTransactionLog() {
+ File logFile = new File(fileName + transaction_log_file_extension);
+ if (logFile.exists())
+ logFile.delete();
+ }
+
+ @Override
+ public boolean isReadonly() {
+ return false;
+ }
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.OverlappingFileLockException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+
+/**
+ * Disk storage which uses mapped buffers
+ */
+public final class StorageDiskMapped implements Storage {
+
+ static final String IDR = ".i";
+
+ static final String DBR = ".d";
+
+ /**
+ * Maximal number of pages in single file. Calculated so that each file will
+ * have 1 GB
+ */
+ final static long PAGES_PER_FILE = (1024 * 1024 * 1024) >>> Storage.PAGE_SIZE_SHIFT;
+
+ private ArrayList<FileChannel> channels = new ArrayList<FileChannel>();
+ private ArrayList<FileChannel> channelsTranslation = new ArrayList<FileChannel>();
+ private IdentityHashMap<FileChannel, MappedByteBuffer> buffers = new IdentityHashMap<FileChannel, MappedByteBuffer>();
+
+ private String fileName;
+ private boolean transactionsDisabled;
+ private boolean readonly;
+
+ public StorageDiskMapped(String fileName, boolean readonly,
+ boolean transactionsDisabled, boolean lockingDisabled) throws IOException {
+ this.fileName = fileName;
+ this.transactionsDisabled = transactionsDisabled;
+ this.readonly = readonly;
+ // make sure first file can be opened
+ // lock it
+ try {
+ if (!lockingDisabled)
+ getChannel(0).lock();
+ } catch (IOException e) {
+ throw new IOException("Could not lock DB file: " + fileName, e);
+ } catch (OverlappingFileLockException e) {
+ throw new IOException("Could not lock DB file: " + fileName, e);
+ }
+
+ }
+
+ private FileChannel getChannel(long pageNumber) throws IOException {
+ int fileNumber = (int) (Math.abs(pageNumber) / PAGES_PER_FILE);
+
+ List<FileChannel> c = pageNumber >= 0 ? channels : channelsTranslation;
+
+ // increase capacity of array lists if needed
+ for (int i = c.size(); i <= fileNumber; i++) {
+ c.add(null);
+ }
+
+ FileChannel ret = c.get(fileNumber);
+ if (ret == null) {
+ String name = makeFileName(fileName, pageNumber, fileNumber);
+ ret = new RandomAccessFile(name, "rw").getChannel();
+ c.set(fileNumber, ret);
+ buffers.put(ret, ret.map(FileChannel.MapMode.READ_WRITE, 0, ret.size()));
+ }
+ return ret;
+ }
+
+ static String makeFileName(String fileName, long pageNumber, int fileNumber) {
+ return fileName + (pageNumber >= 0 ? DBR : IDR) + "." + fileNumber;
+ }
+
+ @Override
+ public void write(long pageNumber, ByteBuffer data) throws IOException {
+ if (transactionsDisabled && data.isDirect()) {
+ // if transactions are disabled and this buffer is direct,
+ // changes written into buffer are directly reflected in file.
+ // so there is no need to write buffer second time
+ return;
+ }
+
+ FileChannel f = getChannel(pageNumber);
+ int offsetInFile = (int) ((Math.abs(pageNumber) % PAGES_PER_FILE) * PAGE_SIZE);
+ MappedByteBuffer b = buffers.get(f);
+ if (b.limit() <= offsetInFile) {
+
+ // remapping buffer for each newly added page would be slow,
+ // so allocate new size in chunks
+ int increment = Math.min(PAGE_SIZE * 1024, offsetInFile / 10);
+ increment -= increment % PAGE_SIZE;
+
+ long newFileSize = offsetInFile + PAGE_SIZE + increment;
+ newFileSize = Math.min(PAGES_PER_FILE * PAGE_SIZE, newFileSize);
+
+ // expand file size
+ f.position(newFileSize - 1);
+ f.write(ByteBuffer.allocate(1));
+ // unmap old buffer
+ unmapBuffer(b);
+ // remap buffer
+ b = f.map(FileChannel.MapMode.READ_WRITE, 0, newFileSize);
+ buffers.put(f, b);
+ }
+
+ // write into buffer
+ b.position(offsetInFile);
+ data.rewind();
+ b.put(data);
+ }
+
+ private static void unmapBuffer(MappedByteBuffer b) {
+ // trying to GC, because cleaner is no public and stable API
+ System.gc();
+ // if (b != null) {
+ // Cleaner cleaner = ((sun.nio.ch.DirectBuffer) b).cleaner();
+ // if (cleaner != null)
+ // cleaner.clean();
+ // }
+ }
+
+ @Override
+ public ByteBuffer read(long pageNumber) throws IOException {
+ FileChannel f = getChannel(pageNumber);
+ int offsetInFile = (int) ((Math.abs(pageNumber) % PAGES_PER_FILE) * PAGE_SIZE);
+ MappedByteBuffer b = buffers.get(f);
+
+ if (b == null) { // not mapped yet
+ b = f.map(FileChannel.MapMode.READ_WRITE, 0, f.size());
+ }
+
+ // check buffers size
+ if (b.limit() <= offsetInFile) {
+ // file is smaller, return empty data
+ return ByteBuffer.wrap(PageFile.CLEAN_DATA).asReadOnlyBuffer();
+ }
+
+ b.position(offsetInFile);
+ ByteBuffer ret = b.slice();
+ ret.limit(PAGE_SIZE);
+ if (!transactionsDisabled || readonly) {
+ // changes written into buffer will be directly written into file
+ // so we need to protect buffer from modifications
+ ret = ret.asReadOnlyBuffer();
+ }
+ return ret;
+ }
+
+ @Override
+ public void forceClose() throws IOException {
+ for (FileChannel f : channels) {
+ if (f == null)
+ continue;
+ f.close();
+ unmapBuffer(buffers.get(f));
+ }
+ for (FileChannel f : channelsTranslation) {
+ if (f == null)
+ continue;
+ f.close();
+ unmapBuffer(buffers.get(f));
+ }
+
+ channels = null;
+ channelsTranslation = null;
+ buffers = null;
+ }
+
+ @Override
+ public void sync() throws IOException {
+ for (MappedByteBuffer b : buffers.values()) {
+ b.force();
+ }
+ }
+
+ @Override
+ public DataOutputStream openTransactionLog() throws IOException {
+ String logName = fileName + StorageDisk.transaction_log_file_extension;
+ final FileOutputStream fileOut = new FileOutputStream(logName);
+ return new DataOutputStream(new BufferedOutputStream(fileOut)) {
+
+ // default implementation of flush on FileOutputStream does nothing,
+ // so we use little workaround to make sure that data were really flushed
+ @Override
+ public void flush() throws IOException {
+ super.flush();
+ fileOut.flush();
+ fileOut.getFD().sync();
+ }
+ };
+ }
+
+ @Override
+ public void deleteAllFiles() throws IOException {
+ deleteTransactionLog();
+ deleteFiles(fileName);
+ }
+
+ static void deleteFiles(String fileName) {
+ for (int i = 0; true; i++) {
+ String name = makeFileName(fileName, +1, i);
+ File f = new File(name);
+ boolean exists = f.exists();
+ if (exists && !f.delete())
+ f.deleteOnExit();
+ if (!exists)
+ break;
+ }
+ for (int i = 0; true; i++) {
+ String name = makeFileName(fileName, -1, i);
+ File f = new File(name);
+ boolean exists = f.exists();
+ if (exists && !f.delete())
+ f.deleteOnExit();
+ if (!exists)
+ break;
+ }
+ }
+
+ @Override
+ public DataInputStream readTransactionLog() {
+
+ File logFile = new File(fileName
+ + StorageDisk.transaction_log_file_extension);
+ if (!logFile.exists())
+ return null;
+ if (logFile.length() == 0) {
+ logFile.delete();
+ return null;
+ }
+
+ DataInputStream ois = null;
+ try {
+ ois = new DataInputStream(new BufferedInputStream(new FileInputStream(
+ logFile)));
+ } catch (FileNotFoundException e) {
+ // file should exists, we check for its presents just a miliseconds
+ // yearlier, anyway move on
+ return null;
+ }
+
+ try {
+ if (ois.readShort() != Magic.LOGFILE_HEADER)
+ throw new Error("Bad magic on log file");
+ } catch (IOException e) {
+ // corrupted/empty logfile
+ logFile.delete();
+ return null;
+ }
+ return ois;
+ }
+
+ @Override
+ public void deleteTransactionLog() {
+ File logFile = new File(fileName
+ + StorageDisk.transaction_log_file_extension);
+ if (logFile.exists())
+ logFile.delete();
+ }
+
+ @Override
+ public boolean isReadonly() {
+ return readonly;
+ }
+
+}
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Storage which keeps all data in memory. Data are lost after storage is
+ * closed.
+ */
+public final class StorageMemory implements Storage {
+
+ private LongHashMap<byte[]> pages = new LongHashMap<byte[]>();
+ private boolean transactionsDisabled;
+
+ StorageMemory(boolean transactionsDisabled) {
+ this.transactionsDisabled = transactionsDisabled;
+ }
+
+ @Override
+ public ByteBuffer read(long pageNumber) throws IOException {
+
+ byte[] data = pages.get(pageNumber);
+ if (data == null) {
+ // out of bounds, so just return empty data
+ return ByteBuffer.wrap(PageFile.CLEAN_DATA).asReadOnlyBuffer();
+ } else {
+ ByteBuffer b = ByteBuffer.wrap(data);
+ if (!transactionsDisabled)
+ return b.asReadOnlyBuffer();
+ else
+ return b;
+ }
+
+ }
+
+ @Override
+ public void write(long pageNumber, ByteBuffer data) throws IOException {
+ if (data.capacity() != PAGE_SIZE)
+ throw new IllegalArgumentException();
+
+ byte[] b = pages.get(pageNumber);
+
+ if (transactionsDisabled && data.hasArray() && data.array() == b) {
+ // already putted directly into array
+ return;
+ }
+
+ if (b == null)
+ b = new byte[PAGE_SIZE];
+
+ data.position(0);
+ data.get(b, 0, PAGE_SIZE);
+ pages.put(pageNumber, b);
+ }
+
+ @Override
+ public void sync() throws IOException {
+ }
+
+ @Override
+ public void forceClose() throws IOException {
+ pages = null;
+ }
+
+ private ByteArrayOutputStream transLog;
+
+ @Override
+ public DataInputStream readTransactionLog() {
+ if (transLog == null)
+ return null;
+ DataInputStream ret = new DataInputStream(new ByteArrayInputStream(
+ transLog.toByteArray()));
+ // read stream header
+ try {
+ ret.readShort();
+ } catch (IOException e) {
+ throw new IOError(e);
+ }
+ return ret;
+ }
+
+ @Override
+ public void deleteTransactionLog() {
+ transLog = null;
+ }
+
+ @Override
+ public DataOutputStream openTransactionLog() throws IOException {
+ if (transLog == null)
+ transLog = new ByteArrayOutputStream();
+ return new DataOutputStream(transLog);
+ }
+
+ @Override
+ public void deleteAllFiles() throws IOException {
+ }
+
+ @Override
+ public boolean isReadonly() {
+ return false;
+ }
+}
Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.util.Enumeration;
+import java.util.Hashtable;
+
+/**
+ * Random insertion/removal test for B+Tree data structure.
+ */
+public class BTreeBench extends TestCaseWithTestFile {
+
+ DBAbstract db;
+
+ /**
+ * Test w/o compression or specialized key or value serializers.
+ *
+ * @throws IOException
+ */
+ public void test_001() throws IOException {
+ db = newDBCache();
+ BTree<Long, Long> tree = BTree.createInstance(db);
+ doTest(db, tree, 5001);
+ db.close();
+ }
+
+ public static void doTest(DB db, BTree<Long, Long> tree, int ITERATIONS)
+ throws IOException {
+
+ long beginTime = System.currentTimeMillis();
+ Hashtable<Long, Long> hash = new Hashtable<Long, Long>();
+
+ for (int i = 0; i < ITERATIONS; i++) {
+ Long random = new Long(random(0, 64000));
+
+ if ((i % 5000) == 0) {
+ long elapsed = System.currentTimeMillis() - beginTime;
+ System.out.println("Iterations=" + i + " Objects=" + tree._entries
+ + ", elapsed=" + elapsed + "ms");
+ db.commit();
+ }
+ if (hash.get(random) == null) {
+ // System.out.println( "Insert " + random );
+ hash.put(random, random);
+ tree.insert(random, random, false);
+ } else {
+ // System.out.println( "Remove " + random );
+ hash.remove(random);
+ Object removed = tree.remove(random);
+ if ((removed == null) || (!removed.equals(random))) {
+ throw new IllegalStateException("Remove expected " + random + " got "
+ + removed);
+ }
+ }
+ // tree.assertOrdering();
+ compare(tree, hash);
+ }
+
+ }
+
+ static long random(int min, int max) {
+ return Math.round(Math.random() * (max - min)) + min;
+ }
+
+ static void compare(BTree<Long, Long> tree, Hashtable<Long, Long> hash)
+ throws IOException {
+ boolean failed = false;
+ Enumeration<Long> enumeration;
+
+ if (tree._entries != hash.size()) {
+ throw new IllegalStateException("Tree size " + tree._entries
+ + " Hash size " + hash.size());
+ }
+
+ enumeration = hash.keys();
+ while (enumeration.hasMoreElements()) {
+ Long key = enumeration.nextElement();
+ Long hashValue = hash.get(key);
+ Long treeValue = tree.get(key);
+ if (!hashValue.equals(treeValue)) {
+ System.out.println("Compare expected " + hashValue + " got "
+ + treeValue);
+ failed = true;
+ }
+ }
+ if (failed) {
+ throw new IllegalStateException("Compare failed");
+ }
+ }
+
+}
Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.SortedMap;
+
+public class BTreeKeyCompressionTest extends TestCaseWithTestFile {
+
+ static final long size = (long) 1e5;
+
+ public void testExpand() throws IOException {
+ long init = Long.MAX_VALUE - size * 2;
+ String file = newTestFile();
+ DB db = new DBStore(file, false, false, false);
+ SortedMap<Long, String> map = db.createTreeMap("aa");
+ for (long i = init; i < init + size; i++) {
+ map.put(i, "");
+ }
+ db.commit();
+ db.defrag(true);
+ db.close();
+ long fileSize = new File(file + ".dbr.0").length() / 1024;
+ System.out.println("file size: " + fileSize);
+ assertTrue("file is too big, compression failed", fileSize < 1000);
+ }
+
+ public void testCornersLimitsLong() throws IOException {
+ DB db = newDBCache();
+ SortedMap<Long, String> map = db.createTreeMap("aa");
+ ArrayList<Long> ll = new ArrayList<Long>();
+ for (Long i = Long.MIN_VALUE; i < Long.MIN_VALUE + 1000; i++) {
+ map.put(i, "");
+ ll.add(i);
+ }
+ for (Long i = -1000l; i < 1000; i++) {
+ map.put(i, "");
+ ll.add(i);
+ }
+ for (Long i = Long.MAX_VALUE - 1000; i <= Long.MAX_VALUE && i > 0; i++) {
+ map.put(i, "");
+ ll.add(i);
+ }
+
+ db.commit();
+
+ db.clearCache();
+ for (Long i : ll) {
+ assertTrue("failed for " + i, map.containsKey(i));
+ }
+
+ assertTrue(!map.containsKey(Long.valueOf(Long.MIN_VALUE + 1000)));
+ assertTrue(!map.containsKey(Long.valueOf(Long.MIN_VALUE + 1001)));
+ assertTrue(!map.containsKey(Long.valueOf(-1001L)));
+ assertTrue(!map.containsKey(Long.valueOf(-1002L)));
+ assertTrue(!map.containsKey(Long.valueOf(1001L)));
+ assertTrue(!map.containsKey(Long.valueOf(1002L)));
+ assertTrue(!map.containsKey(Long.valueOf(Long.MAX_VALUE - 1001)));
+ assertTrue(!map.containsKey(Long.valueOf(Long.MAX_VALUE - 1002)));
+
+ db.close();
+ }
+
+ public void testCornersLimitsInt() throws IOException {
+ DB db = newDBCache();
+ SortedMap<Integer, String> map = db.createTreeMap("aa");
+ ArrayList<Integer> ll = new ArrayList<Integer>();
+ for (Integer i = Integer.MIN_VALUE; i < Integer.MIN_VALUE + 1000; i++) {
+ map.put(new Integer(i), "");
+ ll.add(new Integer(i));
+ }
+ for (Integer i = -1000; i < 1000; i++) {
+ map.put(i, "");
+ ll.add(i);
+ }
+ for (Integer i = Integer.MAX_VALUE - 1000; i <= Integer.MAX_VALUE && i > 0; i++) {
+ map.put(i, "");
+ ll.add(i);
+ }
+
+ db.commit();
+
+ db.clearCache();
+ for (Integer i : ll) {
+ assertTrue("failed for " + i, map.containsKey(i));
+ }
+
+ assertTrue(!map.containsKey(Integer.valueOf(Integer.MIN_VALUE + 1000)));
+ assertTrue(!map.containsKey(Integer.valueOf(Integer.MIN_VALUE + 1001)));
+ assertTrue(!map.containsKey(Integer.valueOf(-1001)));
+ assertTrue(!map.containsKey(Integer.valueOf(-1002)));
+ assertTrue(!map.containsKey(Integer.valueOf(1001)));
+ assertTrue(!map.containsKey(Integer.valueOf(1002)));
+ assertTrue(!map.containsKey(Integer.valueOf(Integer.MAX_VALUE - 1001)));
+ assertTrue(!map.containsKey(Integer.valueOf(Integer.MAX_VALUE - 1002)));
+
+ db.close();
+ }
+
+ public void testStrings() throws IOException {
+ long init = Long.MAX_VALUE - size * 2;
+ String file = newTestFile();
+ DB db = new DBStore(file, false, false, false);
+ SortedMap<String, String> map = db.createTreeMap("aa");
+ for (long i = init; i < init + size / 10; i++) {
+ map.put("aaaaa" + i, "");
+ }
+ db.commit();
+ db.defrag(true);
+ db.close();
+ db = new DBStore(file, false, false, false);
+ map = db.getTreeMap("aa");
+ for (long i = init; i < init + size / 10; i++) {
+ assertTrue(map.containsKey("aaaaa" + i));
+ }
+
+ long fileSize = new File(file + ".dbr.0").length() / 1024;
+ System.out.println("file size with Strings: " + fileSize);
+ assertTrue("file is too big, compression failed", fileSize < 120);
+ }
+
+}
Added: hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java (added)
+++ hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+public class BTreeLeadingValuePackTest extends TestCase {
+
+ public static class ByteArraySource {
+ byte[] last = new byte[0];
+ Random r;
+
+ public ByteArraySource(long seed) {
+ r = new Random(seed);
+ r.nextBytes(last);
+ }
+
+ public byte[] getBytesWithCommonPrefix(int len, int common) {
+ if (common > last.length)
+ common = last.length;
+ if (common > len)
+ common = len;
+
+ byte[] out = new byte[len];
+ System.arraycopy(last, 0, out, 0, common);
+ byte[] xtra = new byte[len - common];
+ r.nextBytes(xtra);
+ System.arraycopy(xtra, 0, out, common, xtra.length);
+
+ last = out;
+ return out;
+ }
+
+ }
+
+ private void doCompressUncompressTestFor(byte[][] groups) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+
+ // compress
+ for (int i = 0; i < groups.length; i++) {
+ BTreeNode.leadingValuePackWrite(dos, groups[i], i > 0 ? groups[i - 1]
+ : null, 0);
+ }
+
+ byte[] results = baos.toByteArray();
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(results);
+ DataInputStream dis = new DataInputStream(bais);
+
+ byte[] previous = null;
+ for (int i = 0; i < groups.length; i++) {
+ previous = BTreeNode.leadingValuePackRead(dis, previous, 0);
+ assertTrue(Arrays.equals(groups[i], previous));
+ }
+
+ }
+
+ private byte[][] getIncrementingGroups(int groupCount, long seed,
+ int lenInit, int comInit, int lenIncr, int comIncr) {
+ ByteArraySource bap = new ByteArraySource(seed);
+ byte[][] groups = new byte[groupCount][];
+ for (int i = 0; i < groupCount; i++) {
+ groups[i] = bap.getBytesWithCommonPrefix(lenInit, comInit);
+ lenInit += lenIncr;
+ comInit += comIncr;
+ }
+ return groups;
+ }
+
+ public void testCompDecompEqualLenEqualCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 5, // starting common bytes
+ 0, // length increment
+ 0 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompEqualLenIncrCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 5, // starting common bytes
+ 0, // length increment
+ 2 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompEqualLenDecrCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 40, // starting common bytes
+ 0, // length increment
+ -2 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompIncrLenEqualCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 30, // starting byte array length
+ 25, // starting common bytes
+ 1, // length increment
+ 0 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompDecrLenEqualCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 25, // starting common bytes
+ -1, // length increment
+ 0 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompNoCommon() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 0, // starting common bytes
+ -1, // length increment
+ 0 // common bytes increment
+ );
+
+ doCompressUncompressTestFor(groups);
+ }
+
+ public void testCompDecompNullGroups() throws IOException {
+ byte[][] groups = getIncrementingGroups(5, // number of groups
+ 1000, // seed
+ 50, // starting byte array length
+ 25, // starting common bytes
+ -1, // length increment
+ 0 // common bytes increment
+ );
+
+ groups[2] = null;
+ groups[4] = null;
+
+ doCompressUncompressTestFor(groups);
+ }
+
+}