You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2015/06/09 18:17:32 UTC
[2/2] git commit: updated refs/heads/trunk to 06a1084
[GIRAPH-1010] Add utilities to allow Kryo to serialize objects
Test Plan: mvn clean install
Reviewers: dionysis.logothetis, sergey.edunov, maja.kabiljo
Reviewed By: maja.kabiljo
Differential Revision: https://reviews.facebook.net/D39513
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/06a1084a
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/06a1084a
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/06a1084a
Branch: refs/heads/trunk
Commit: 06a1084af99ee46a03ebbbee53e3224ca8862132
Parents: 27661ba
Author: Igor Kabiljo <ik...@fb.com>
Authored: Wed Jun 3 13:35:20 2015 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Tue Jun 9 09:17:06 2015 -0700
----------------------------------------------------------------------
giraph-core/pom.xml | 12 +
.../org/apache/giraph/utils/WritableUtils.java | 126 +++--
.../writable/kryo/DataInputWrapperStream.java | 59 +++
.../writable/kryo/DataOutputWrapperStream.java | 47 ++
.../apache/giraph/writable/kryo/HadoopKryo.java | 435 +++++++++++++++++
.../giraph/writable/kryo/KryoWritable.java | 40 ++
.../writable/kryo/KryoWritableWrapper.java | 110 +++++
.../giraph/writable/kryo/TransientRandom.java | 69 +++
.../kryo/markers/KryoIgnoreWritable.java | 32 ++
.../writable/kryo/markers/NonKryoWritable.java | 26 +
.../writable/kryo/markers/package-info.java | 21 +
.../giraph/writable/kryo/package-info.java | 23 +
.../serializers/ArraysAsListSerializer.java | 44 ++
.../CollectionsNCopiesSerializer.java | 52 ++
.../serializers/DirectWritableSerializer.java | 83 ++++
.../kryo/serializers/FastUtilSerializer.java | 476 +++++++++++++++++++
.../serializers/ReusableFieldSerializer.java | 57 +++
.../writable/kryo/serializers/package-info.java | 21 +
.../giraph/writable/kryo/KryoWritableTest.java | 195 ++++++++
.../writable/kryo/KryoWritableWrapperTest.java | 292 ++++++++++++
pom.xml | 33 ++
21 files changed, 2217 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 6fea1a3..4719a5d 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -542,6 +542,18 @@ under the License.
<groupId>org.python</groupId>
<artifactId>jython</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>de.javakaffee</groupId>
+ <artifactId>kryo-serializers</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.objenesis</groupId>
+ <artifactId>objenesis</artifactId>
+ </dependency>
<!-- runtime dependency -->
<dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
index be2ef9d..68ed89a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java
@@ -748,42 +748,6 @@ public class WritableUtils {
}
/**
- * Create a copy of Writable object, by serializing and deserializing it.
- *
- * @param reusableOut Reusable output stream to serialize into
- * @param reusableIn Reusable input stream to deserialize out of
- * @param original Original value of which to make a copy
- * @param conf Configuration
- * @param <T> Type of the object
- * @return Copy of the original value
- */
- public static <T extends Writable> T createCopy(
- UnsafeByteArrayOutputStream reusableOut,
- UnsafeReusableByteArrayInput reusableIn, T original,
- ImmutableClassesGiraphConfiguration conf) {
- T copy = (T) createWritable(original.getClass(), conf);
-
- try {
- reusableOut.reset();
- original.write(reusableOut);
- reusableIn.initialize(
- reusableOut.getByteArray(), 0, reusableOut.getPos());
- copy.readFields(reusableIn);
-
- if (reusableIn.available() != 0) {
- throw new RuntimeException("Serialization of " +
- original.getClass() + " encountered issues, " +
- reusableIn.available() + " bytes left to be read");
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "IOException occurred while trying to create a copy " +
- original.getClass(), e);
- }
- return copy;
- }
-
- /**
* Writes primitive int array of ints into output stream.
* Array can be null or empty.
* @param array array to be written
@@ -896,4 +860,94 @@ public class WritableUtils {
return null;
}
}
+
+
+ /**
+ * Copy {@code from} into {@code to}, by serializing and deserializing it.
+ * Since it is creating streams inside, it's mostly useful for
+ * tests/non-performant code.
+ *
+ * @param from Object to copy from
+ * @param to Object to copy into
+ * @param <T> Type of the object
+ */
+ public static <T extends Writable> void copyInto(T from, T to) {
+ copyInto(from, to, false);
+ }
+
+ /**
+ * Copy {@code from} into {@code to}, by serializing and deserializing it.
+ * Since it is creating streams inside, it's mostly useful for
+ * tests/non-performant code.
+ *
+ * @param from Object to copy from
+ * @param to Object to copy into
+ * @param checkOverRead if true, will add one more byte at the end of writing,
+ * to make sure read is not touching it. Useful for tests
+ * @param <T> Type of the object
+ */
+ public static <T extends Writable> void copyInto(
+ T from, T to, boolean checkOverRead) {
+ try {
+ if (from.getClass() != to.getClass()) {
+ throw new RuntimeException(
+ "Trying to copy from " + from.getClass() +
+ " into " + to.getClass());
+ }
+
+ UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream();
+ from.write(out);
+ if (checkOverRead) {
+ out.writeByte(0);
+ }
+
+ UnsafeByteArrayInputStream in =
+ new UnsafeByteArrayInputStream(out.getByteArray(), 0, out.getPos());
+ to.readFields(in);
+
+ if (in.available() != (checkOverRead ? 1 : 0)) {
+ throw new RuntimeException(
+ "Serialization encountered issues with " + from.getClass() + ", " +
+ (in.available() - (checkOverRead ? 1 : 0)) + " fewer bytes read");
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Create a copy of Writable object, by serializing and deserializing it.
+ *
+ * @param reusableOut Reusable output stream to serialize into
+ * @param reusableIn Reusable input stream to deserialize out of
+ * @param original Original value of which to make a copy
+ * @param conf Configuration
+ * @param <T> Type of the object
+ * @return Copy of the original value
+ */
+ public static <T extends Writable> T createCopy(
+ UnsafeByteArrayOutputStream reusableOut,
+ UnsafeReusableByteArrayInput reusableIn, T original,
+ ImmutableClassesGiraphConfiguration conf) {
+ T copy = (T) createWritable(original.getClass(), conf);
+
+ try {
+ reusableOut.reset();
+ original.write(reusableOut);
+ reusableIn.initialize(
+ reusableOut.getByteArray(), 0, reusableOut.getPos());
+ copy.readFields(reusableIn);
+
+ if (reusableIn.available() != 0) {
+ throw new RuntimeException("Serialization of " +
+ original.getClass() + " encountered issues, " +
+ reusableIn.available() + " bytes left to be read");
+ }
+ } catch (IOException e) {
+ throw new IllegalStateException(
+ "IOException occurred while trying to create a copy " +
+ original.getClass(), e);
+ }
+ return copy;
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
new file mode 100644
index 0000000..e0e23b0
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataInputWrapperStream.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Thin wrapper around DataInput so it can be used as an
+ * {@link java.io.InputStream}
+ *
+ * For use with {@link com.esotericsoftware.kryo.io.Input}
+ */
+public class DataInputWrapperStream extends InputStream {
+ /** Wrapped DataInput object */
+ private DataInput in;
+
+ public void setDataInput(DataInput in) {
+ this.in = in;
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return in.readByte() & 0xFF;
+ } catch (EOFException e) {
+ throw new RuntimeException(
+ "Chunked input should never read more than chunked output wrote", e);
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ try {
+ in.readFully(b, off, len);
+ return len;
+ } catch (EOFException e) {
+ throw new RuntimeException(
+ "Chunked input should never read more than chunked output wrote", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
new file mode 100644
index 0000000..3518f67
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/DataOutputWrapperStream.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Thin wrapper around a DataOutput so it can be used as an
+ * {@link java.io.OutputStream}
+ *
+ * For use with {@link com.esotericsoftware.kryo.io.Output})
+ */
+public class DataOutputWrapperStream extends OutputStream {
+ /** Wrapped DataOutput object */
+ private DataOutput out;
+
+ public void setDataOutput(DataOutput out) {
+ this.out = out;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
new file mode 100644
index 0000000..b4f2bfa
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.giraph.writable.kryo.serializers.ArraysAsListSerializer;
+import org.apache.giraph.writable.kryo.serializers.CollectionsNCopiesSerializer;
+import org.apache.giraph.writable.kryo.serializers.DirectWritableSerializer;
+import org.apache.giraph.writable.kryo.serializers.FastUtilSerializer;
+import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.objenesis.strategy.StdInstantiatorStrategy;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.factories.SerializerFactory;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.InputChunked;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.io.OutputChunked;
+import com.esotericsoftware.kryo.pool.KryoCallback;
+import com.esotericsoftware.kryo.pool.KryoFactory;
+import com.esotericsoftware.kryo.pool.KryoPool;
+import com.esotericsoftware.kryo.serializers.ClosureSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import com.google.common.base.Preconditions;
+
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+
+/**
+ * Kryo instance that provides serialization through DataInput/DataOutput
+ * that org.apache.hadoop.io.Writable uses.
+ *
+ * All public APIs are static.
+ *
+ * It extends Kryo to reuse KryoPool functionality, but have additional needed
+ * objects cached as well. If we move to ThreadLocal or other caching
+ * technique, we can use composition, instead of inheritance here.
+ */
+public class HadoopKryo extends Kryo {
+ /** Pool of reusable Kryo objects, since they are expensive to create */
+ private static final KryoPool KRYO_POOL = new KryoPool.Builder(
+ new KryoFactory() {
+ @Override
+ public Kryo create() {
+ return createKryo();
+ }
+ }).build();
+
+ /**
+ * List of interfaces/parent classes that will not be allowed to be
+ * serialized, together with explanation of why, that will be shown
+ * when throwing such exception
+ */
+ private static final Map<Class<?>, String> NON_SERIALIZABLE;
+
+ static {
+ NON_SERIALIZABLE = new LinkedHashMap<>();
+ NON_SERIALIZABLE.put(
+ NonKryoWritable.class,
+ "it is marked to not allow serialization, " +
+ "look at the class for more details");
+ NON_SERIALIZABLE.put(
+ KryoWritableWrapper.class, "recursion is dissallowed");
+ NON_SERIALIZABLE.put(
+ Configuration.class,
+ "it cannot be supported since it contains ClassLoader");
+ NON_SERIALIZABLE.put(
+ GiraphConfigurationSettable.class, "configuration cannot be set");
+ NON_SERIALIZABLE.put(
+ Configurable.class, "configuration cannot be set");
+ NON_SERIALIZABLE.put(
+ Random.class,
+ "it should be rarely serialized, since it would create same stream " +
+ "of numbers everywhere, use TransientRandom instead");
+ }
+
+ // Use chunked streams, so within same stream we can use both kryo and
+ // non-kryo serialization.
+ /** Reusable Input object */
+ private final InputChunked input = new InputChunked(4096);
+ /** Reusable Output object */
+ private final OutputChunked output = new OutputChunked(4096);
+
+ /** Reusable DataInput wrapper stream */
+ private final DataInputWrapperStream dataInputWrapperStream =
+ new DataInputWrapperStream();
+ /** Reusable DataOutput wrapper stream */
+ private final DataOutputWrapperStream dataOutputWrapperStream =
+ new DataOutputWrapperStream();
+
+ /**
+ * Map of already initialized serializers used
+ * for readIntoObject/writeOutOfObject pair of methods
+ */
+ private final ObjectMap<Class<?>, ReusableFieldSerializer<Object>>
+ classToIntoSerializer = new ObjectMap<>();
+
+ /** Hide constructor, so all access go through pool of cached objects */
+ private HadoopKryo() {
+ }
+
+ // Public API:
+
+ /**
+ * Write type of given object and the object itself to the output stream.
+ * Inverse of readClassAndObject.
+ *
+ * @param out Output stream
+ * @param object Object to write
+ */
+ public static void writeClassAndObject(
+ final DataOutput out, final Object object) {
+ writeInternal(out, object, false);
+ }
+
+ /**
+ * Read object from the input stream, by reading first type of the object,
+ * and then all of it's fields.
+ * Inverse of writeClassAndObject.
+ *
+ * @param in Input stream
+ * @return Deserialized object
+ * @param <T> Type of the object being read
+ */
+ public static <T> T readClassAndObject(DataInput in) {
+ return readInternal(in, null, false);
+ }
+
+ /**
+ * Write an object to output, in a way that can be read by readIntoObject.
+ *
+ * @param out Output stream
+ * @param object Object to be written
+ */
+ public static void writeOutOfObject(
+ final DataOutput out, final Object object) {
+ writeInternal(out, object, true);
+ }
+
+ /**
+ * Reads an object, from input, into a given object,
+ * allowing object reuse.
+ * Inverse of writeOutOfObject.
+ *
+ * @param in Input stream
+ * @param object Object to fill from input
+ */
+ public static void readIntoObject(DataInput in, Object object) {
+ readInternal(in, object, true);
+ }
+
+ /**
+ * Create copy of the object, by magically recursively copying
+ * all of it's fields, keeping reference structures (like cycles)
+ *
+ * @param object Object to be copied
+ * @return Copy of the object.
+ * @param <T> Type of the object
+ */
+ public static <T> T createCopy(final T object) {
+ return KRYO_POOL.run(new KryoCallback<T>() {
+ @Override
+ public T execute(Kryo kryo) {
+ return kryo.copy(object);
+ }
+ });
+ }
+
+ // Private implementation:
+
+ /**
+ * Create new instance of HadoopKryo, properly initialized.
+ *
+ * @return New HadoopKryo instnace
+ */
+ private static HadoopKryo createKryo() {
+ HadoopKryo kryo = new HadoopKryo();
+
+ String version = System.getProperty("java.version");
+ char minor = version.charAt(2);
+ if (minor >= '8') {
+ try {
+ kryo.register(Class.forName("java.lang.invoke.SerializedLambda"));
+ kryo.register(Class.forName("com.esotericsoftware.kryo.Kryo$Closure"),
+ new ClosureSerializer());
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(
+ "Trying to use Kryo on >= Java 8 (" + version +
+ "), but unable to find needed classes", e);
+ }
+ }
+
+ kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());
+ kryo.register(Collections.nCopies(1, new Object()).getClass(),
+ new CollectionsNCopiesSerializer());
+
+ ImmutableListSerializer.registerSerializers(kryo);
+
+ // TODO move guava version to 18.0, and remove this fix:
+ try {
+ kryo.register(
+ Class.forName("com.google.common.collect.RegularImmutableList"),
+ new ImmutableListSerializer());
+ } catch (ClassNotFoundException e) {
+ throw new IllegalStateException(
+ "Guava has RegularImmutableList missing", e);
+ }
+
+ // There are many fastutil classes, register them at the end,
+ // so they don't use up small registration numbers
+ FastUtilSerializer.registerAll(kryo);
+
+ kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(
+ new StdInstantiatorStrategy()));
+
+ kryo.setDefaultSerializer(new SerializerFactory() {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public Serializer makeSerializer(Kryo kryo, final Class<?> type) {
+ for (final Entry<Class<?>, String> entry :
+ NON_SERIALIZABLE.entrySet()) {
+ if (entry.getKey().isAssignableFrom(type)) {
+ // Allow Class object to be serialized, but not a live instance.
+ return new Serializer() {
+ @Override
+ public Object read(Kryo kryo, Input input, Class type) {
+ throw new RuntimeException("Cannot serialize " + type +
+ ". Objects being serialized cannot capture " +
+ entry.getKey() + " because " + entry.getValue() +
+ ". Either remove field in question" +
+ ", or make it transient (so that it isn't serialized)");
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, Object object) {
+ throw new RuntimeException("Cannot serialize " + type +
+ ". Objects being serialized cannot capture " +
+ entry.getKey() + " because " + entry.getValue() +
+ ". Either remove field in question" +
+ ", or make it transient (so that it isn't serialized)");
+ }
+ };
+ }
+ }
+
+ if (Writable.class.isAssignableFrom(type) &&
+ !KryoIgnoreWritable.class.isAssignableFrom(type) &&
+ // remove BasicSet, BasicArrayList and Basic2ObjectMap temporarily,
+ // for lack of constructors
+ !BasicSet.class.isAssignableFrom(type) &&
+ !BasicArrayList.class.isAssignableFrom(type) &&
+ !Basic2ObjectMap.class.isAssignableFrom(type)) {
+ // use the Writable method defined by the type
+ DirectWritableSerializer serializer = new DirectWritableSerializer();
+ return serializer;
+ } else {
+ FieldSerializer serializer = new FieldSerializer<>(kryo, type);
+ serializer.setIgnoreSyntheticFields(false);
+ return serializer;
+ }
+ }
+ });
+
+ return kryo;
+ }
+
+ /**
+ * Initialize reusable objects for reading from given DataInput.
+ *
+ * @param in Input stream
+ */
+ private void setDataInput(DataInput in) {
+ dataInputWrapperStream.setDataInput(in);
+ input.setInputStream(dataInputWrapperStream);
+ }
+
+ /**
+ * Initialize reusable objects for writing into given DataOutput.
+ *
+ * @param out Output stream
+ */
+ private void setDataOutput(DataOutput out) {
+ dataOutputWrapperStream.setDataOutput(out);
+ output.setOutputStream(dataOutputWrapperStream);
+ }
+
+ /**
+ * Get or create reusable serializer for given class.
+ *
+ * @param type Type of the object
+ * @return Serializer
+ */
+ private ReusableFieldSerializer<Object> getOrCreateReusableSerializer(
+ Class<?> type) {
+ ReusableFieldSerializer<Object> serializer =
+ classToIntoSerializer.get(type);
+ if (serializer == null) {
+ serializer = new ReusableFieldSerializer<>(this, type);
+ classToIntoSerializer.put(type, serializer);
+ }
+ return serializer;
+ }
+
+ /**
+ * Internal write implementation, that reuses HadoopKryo objects
+ * from the pool.
+ *
+ * @param out Output stream
+ * @param object Object to be written
+ * @param outOf whether we are writing reusable objects,
+ * or full objects with class name
+ */
+ private static void writeInternal(
+ final DataOutput out, final Object object, final boolean outOf) {
+ KRYO_POOL.run(new KryoCallback<Void>() {
+ @Override
+ public Void execute(Kryo kryo) {
+ HadoopKryo hkryo = (HadoopKryo) kryo;
+ hkryo.setDataOutput(out);
+
+ if (outOf) {
+ hkryo.writeOutOfObject(hkryo.output, object);
+ } else {
+ hkryo.writeClassAndObject(hkryo.output, object);
+ }
+
+ hkryo.output.endChunks();
+ hkryo.output.close();
+
+ return null;
+ }
+ });
+ }
+
+ /**
+ * Internal read implementation, that reuses HadoopKryo objects
+ * from the pool.
+ *
+ * @param in Input stream
+ * @param outObject Object to fill from input (if not null)
+ * @param into whether we are reading reusable objects,
+ * or full objects with class name
+ * @return Read object (new one, or same passed in if we use reusable)
+ * @param <T> Type of the object to read
+ */
+ @SuppressWarnings("unchecked")
+ private static <T> T readInternal(
+ final DataInput in, final T outObject, final boolean into) {
+ return KRYO_POOL.run(new KryoCallback<T>() {
+ @Override
+ public T execute(Kryo kryo) {
+ HadoopKryo hkryo = (HadoopKryo) kryo;
+ hkryo.setDataInput(in);
+
+ T object;
+ if (into) {
+ hkryo.readIntoObject(hkryo.input, outObject);
+ object = outObject;
+ } else {
+ object = (T) hkryo.readClassAndObject(hkryo.input);
+ }
+ hkryo.input.nextChunks();
+
+ hkryo.input.close();
+ return object;
+ }
+ });
+ }
+
+ /**
+ * Reads an object, from input, into a given object,
+ * allowing object reuse.
+ *
+ * @param input Input stream
+ * @param object Object to fill from input
+ */
+ private void readIntoObject(Input input, Object object) {
+ Preconditions.checkNotNull(object);
+
+ Class<?> type = object.getClass();
+ ReusableFieldSerializer<Object> serializer =
+ getOrCreateReusableSerializer(type);
+
+ serializer.setReadIntoObject(object);
+ Object result = readObject(input, type, serializer);
+
+ Preconditions.checkState(result == object);
+ }
+
+ /**
+ * Write an object to output, in a way that can be read
+ * using readIntoObject.
+ * @param output Output stream
+ * @param object Object to be written
+ */
+ private void writeOutOfObject(Output output, Object object) {
+ ReusableFieldSerializer<Object> serializer =
+ getOrCreateReusableSerializer(object.getClass());
+ writeObject(output, object, serializer);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
new file mode 100644
index 0000000..1e03888
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritable.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable;
+
+/**
+ * Class which you can extend to get all serialization/deserialization
+ * done automagically
+ */
+public abstract class KryoWritable implements KryoIgnoreWritable {
+ @Override
+ public final void write(DataOutput out) throws IOException {
+ HadoopKryo.writeOutOfObject(out, this);
+ }
+
+ @Override
+ public final void readFields(DataInput in) throws IOException {
+ HadoopKryo.readIntoObject(in, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
new file mode 100644
index 0000000..0f6e73f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Generic wrapper object, making any object writable.
+ *
+ * Uses Kryo inside for serialization.
+ * Current configuration is not optimized for performance,
+ * but Writable interface doesn't allow much room for it.
+ *
+ * Note - Java8 lambdas need to implement Serializable to work.
+ *
+ * @param <T> Object type
+ */
+public class KryoWritableWrapper<T> implements Writable {
+ /** Wrapped object */
+ private T object;
+
+ /**
+ * Create wrapper given an object.
+ * @param object Object instance
+ */
+ public KryoWritableWrapper(T object) {
+ this.object = object;
+ }
+
+ /**
+ * Creates wrapper initialized with null.
+ */
+ public KryoWritableWrapper() {
+ }
+
+ /**
+ * Unwrap the object value
+ * @return Object value
+ */
+ public T get() {
+ return object;
+ }
+
+ /**
+ * Set wrapped object value
+ * @param object New object value
+ */
+ public void set(T object) {
+ this.object = object;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws java.io.IOException {
+ object = HadoopKryo.readClassAndObject(in);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ HadoopKryo.writeClassAndObject(out, object);
+ }
+
+ /**
+ * Returns Writable instance, wrapping given object only
+ * if it is not already writable.
+ *
+ * @param object Object to potentially wrap
+ * @return Writable object holding argument
+ */
+ public static Writable wrapIfNeeded(Object object) {
+ if (object instanceof Writable) {
+ return (Writable) object;
+ } else {
+ return new KryoWritableWrapper<>(object);
+ }
+ }
+
+ /**
+ * Unwrap Writable object if it was wrapped initially,
+ * inverse of wrapIfNeeded function.
+ * @param value Potentially wrapped value
+ * @return Original unwrapped value
+ * @param <T> Type of returned object.
+ */
+ public static <T> T unwrapIfNeeded(Writable value) {
+ if (value instanceof KryoWritableWrapper) {
+ return ((KryoWritableWrapper<T>) value).get();
+ } else {
+ return (T) value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
new file mode 100644
index 0000000..cd26be8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/TransientRandom.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import java.util.Random;
+
+/**
+ * Transient Random class. Seed/state is not kept after
+ * serializing/deserializing.
+ *
+ * Within Blocks Framework - if we initialize Random within the Piece, when
+ * it's serialzied and copied to all workers and all threads - keeping seed
+ * would cause same series of random numbers to be generated everywhere.
+ *
+ * So this class is safe to be used in Pieces, while using regular Random
+ * class is forbidden to be serialized.
+ * Best approach would be to not have Random serialized, and create it on
+ * workers, where possible.
+ */
+public class TransientRandom {
+ /** Instance of random object */
+ private final transient Random random = new Random();
+
+ /**
+ * Get instance of Random
+ * @return Random instance
+ */
+ public Random get() {
+ return random;
+ }
+
+ /**
+ * Returns a pseudorandom, uniformly distributed {@code int} value
+ * between 0 (inclusive) and the specified value (exclusive), drawn from
+ * this random number generator's sequence.
+ *
+ * @param n Given upper limit
+ * @return pseudorandom integer number in [0, n) range.
+ */
+ public int nextInt(int n) {
+ return random.nextInt(n);
+ }
+
+ /**
+ * Returns the next pseudorandom, uniformly distributed
+ * {@code double} value between {@code 0.0} and
+ * {@code 1.0} from this random number generator's sequence.
+ *
+ * @return pseudorandom number in [0, 1)
+ */
+ public double nextDouble() {
+ return random.nextDouble();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
new file mode 100644
index 0000000..58dd78d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/KryoIgnoreWritable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.markers;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * Marker interface, specifying that kryo should serialize it on it's own,
+ * and ignore actual Writable method implementations.
+ *
+ * If you are using HadoopKryo.writeOutOfObject/readIntoObject result is the
+ * same, and adding it allows wrapping Kryo context into writable context,
+ * and then wrapping it back into Kryo context.
+ */
+public interface KryoIgnoreWritable extends Writable {
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
new file mode 100644
index 0000000..66840c5
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/NonKryoWritable.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.markers;
+
+
+/**
+ * Marker interface saying that class should never be serialized,
+ * that it is code error for it to be tried to be serialized.
+ */
+public interface NonKryoWritable {
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
new file mode 100644
index 0000000..5fcfa72
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/markers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Marker interfaces used by HadoopKryo to special-case serialization.
+ */
+package org.apache.giraph.writable.kryo.markers;
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
new file mode 100644
index 0000000..a04bb65
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities for using Kryo to efficiently serialize objects,
+ * and integrate with Writable interface.
+ * HadoopKryo is a main class doing serialization.
+ */
+package org.apache.giraph.writable.kryo;
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
new file mode 100644
index 0000000..3d66eb7
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ArraysAsListSerializer.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Special serializer for Arrays.asList() as they can not be
+ * deserialized in a standard way.
+ * {@see
+ * https://groups.google.com/forum/#!msg/kryo-users/2lXTCEOSxA0/gLzIZRtaNCUJ}
+ */
+public class ArraysAsListSerializer extends Serializer<List> {
+ @Override
+ public void write(Kryo kryo, Output output, List object) {
+ kryo.writeObject(output, object.toArray(new Object[object.size()]));
+ }
+
+ @Override
+ public List read(Kryo kryo, Input input, Class<List> type) {
+ return Arrays.asList(kryo.readObject(input, Object[].class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
new file mode 100644
index 0000000..5460692
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/CollectionsNCopiesSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Special serializer for Collections.nCopies
+ *
+ * @param <T> Element type
+ */
+public class CollectionsNCopiesSerializer<T> extends Serializer<List<T>> {
+ @Override
+ public void write(Kryo kryo, Output output, List<T> object) {
+ output.writeInt(object.size(), true);
+ if (object.size() > 0) {
+ kryo.writeClassAndObject(output, object.get(0));
+ }
+ }
+
+ @Override
+ public List<T> read(Kryo kryo, Input input, Class<List<T>> type) {
+ int size = input.readInt(true);
+ if (size > 0) {
+ T object = (T) kryo.readClassAndObject(input);
+ return Collections.nCopies(size, object);
+ } else {
+ return Collections.emptyList();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
new file mode 100644
index 0000000..1967e6a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/DirectWritableSerializer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * A custom Serializer that will call the Writable methods defined by the
+ * object itself to serialize the object, instead of Kryo auto-magically
+ * serializing
+ *
+ * @param <T> Object type, should implement Writable
+ */
+
+public class DirectWritableSerializer<T extends Writable>
+ extends Serializer<T> {
+
+ @Override
+ public void write(Kryo kryo, Output output, T object) {
+ try {
+ object.write(new DataOutputStream(output));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "DirectWritableSerializer.write calling Writable method of class: " +
+ object.getClass().getName() + " encountered issues", e);
+ }
+ }
+
+ @Override
+ public T read(Kryo kryo, Input input, Class<T> type) {
+ try {
+ T object = create(kryo, input, type);
+ kryo.reference(object);
+ object.readFields(new DataInputStream(input));
+
+ return object;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "DirectWritableSerializer.read calling Writable method of class: " +
+ type.getName() + " encountered issues", e);
+ }
+ }
+
+ /**
+ * Used by {@link #read(Kryo, Input, Class)} to create the new object.
+ * This can be overridden to customize object creation, eg to call a
+ * constructor with arguments. The default implementation
+ * uses {@link Kryo#newInstance(Class)}.
+ *
+ * @param kryo Kryo object instance
+ * @param input Input
+ * @param type Type of the class to create
+ * @return New instance of wanted type
+ */
+ protected T create(Kryo kryo, Input input, Class<T> type) {
+ return ReflectionUtils.newInstance(type);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
new file mode 100644
index 0000000..0eb9676
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/FastUtilSerializer.java
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+/**
+ * Kryo Serializer for Fastutil collection class.
+ * By default, because they extend boxed collections, are being serialized very
+ * inefficiently through a lot of temporary object creation.
+ *
+ * We are relying that fastutil classes are written to be correctly serialized
+ * with Java serialization, and have put transient on all array fields and are
+ * doing custom efficient serialization in writeObject/readObject methods.
+ * This Serializer then swaps ObjectOutputStream for all default fields with
+ * FieldSerializer, and then calls appropriate writeObject/readObject methods.
+ * We are also relying on defaultWriteObject/defaultReadObject being
+ * effectively called first within those methods
+ *
+ * @param <T> Object type
+ */
+public class FastUtilSerializer<T> extends Serializer<T> {
+ /** List of all types generated by fastutil */
+ private static final String[] PRIMITIVE_TYPES = new String[] {
+ "Boolean", "Byte", "Short", "Int", "Long", "Float", "Double", "Char",
+ "Object"};
+ /** List of all types used as keys in fastutil */
+ private static final String[] PRIMITIVE_KEY_TYPES = new String[] {
+ "Byte", "Short", "Int", "Long", "Float", "Double", "Char", "Object"};
+
+ /** Field serializer for this fastutil class */
+ private final FieldSerializer<T> fieldSerializer;
+
+ /** Handle to writeObject Method on this fastutil class*/
+ private final Method writeMethod;
+ /** Handle to readObject Method on this fastutil class*/
+ private final Method readMethod;
+ /** Reusable output stream wrapper */
+ private final FastUtilSerializer.FastutilKryoObjectOutputStream outputWrapper;
+ /** Reusable input stream wrapper */
+ private final FastUtilSerializer.FastutilKryoObjectInputStream inputWrapper;
+
+ /**
+ * Creates and initializes new serializer for a given fastutil class.
+ * @param kryo Kryo instance
+ * @param type Fastutil class
+ */
+ public FastUtilSerializer(Kryo kryo, Class<T> type) {
+ fieldSerializer = new FieldSerializer<>(kryo, type);
+ fieldSerializer.setIgnoreSyntheticFields(false);
+
+ try {
+ writeMethod = type.getDeclaredMethod(
+ "writeObject", ObjectOutputStream.class);
+ writeMethod.setAccessible(true);
+ readMethod = type.getDeclaredMethod(
+ "readObject", ObjectInputStream.class);
+ readMethod.setAccessible(true);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(
+ "Fastutil class " + type +
+ " doesn't have readObject/writeObject methods", e);
+ }
+
+ try {
+ outputWrapper = new FastutilKryoObjectOutputStream();
+ inputWrapper = new FastutilKryoObjectInputStream();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Register serializer for a given fastutil class.
+ * @param kryo Kryo instance
+ * @param fastutilClass Fastutil class
+ */
+ public static void register(Kryo kryo, Class<?> fastutilClass) {
+ kryo.register(fastutilClass, new FastUtilSerializer<>(kryo, fastutilClass));
+ }
+
+ /**
+ * Registers serializers for all possible fastutil classes.
+ *
+ * There are many fastutil classes, so it is recommended to call this
+ * function at the end, so they fastutil classes don't use up small
+ * registration numbers.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerAll(Kryo kryo) {
+ registerArrayLists(kryo);
+ registerArrayBigList(kryo);
+ registerOpenHashSets(kryo);
+ registerArraySets(kryo);
+ registerRBTreeSets(kryo);
+ registerAVLTreeSets(kryo);
+ registerOpenHashMaps(kryo);
+ registerRBTreeMaps(kryo);
+ registerAVLTreeMaps(kryo);
+
+ // Note - HeapPriorityQueues don't extend boxed collection,
+ // and so they work out of the box correctly
+ }
+
+ /**
+ * Register all Fastutil ArrayLists.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerArrayLists(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_ArrayList", PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil ArrayBigLists.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerArrayBigList(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_BigArrayBigList", PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil OpenHashSets.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerOpenHashSets(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_OpenHashSet", PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil ArraySets.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerArraySets(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_ArraySet", PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil RBTreeSets.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerRBTreeSets(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_RBTreeSet", PRIMITIVE_KEY_TYPES));
+ }
+
+ /**
+ * Register all Fastutil AVLTreeSets.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerAVLTreeSets(Kryo kryo) {
+ registerAll(kryo, singleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_AVLTreeSet", PRIMITIVE_KEY_TYPES));
+ }
+
+ /**
+ * Register all Fastutil OpenHashMaps.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerOpenHashMaps(Kryo kryo) {
+ registerAll(kryo, doubleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_2_T2_OpenHashMap",
+ PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil RBTreeMaps.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerRBTreeMaps(Kryo kryo) {
+ registerAll(kryo, doubleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_2_T2_RBTreeMap",
+ PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all Fastutil AVLTreeMaps.
+ *
+ * @param kryo Kryo instance
+ */
+ public static void registerAVLTreeMaps(Kryo kryo) {
+ registerAll(kryo, doubleTypes(
+ "it.unimi.dsi.fastutil._t1_s._T1_2_T2_AVLTreeMap",
+ PRIMITIVE_KEY_TYPES, PRIMITIVE_TYPES));
+ }
+
+ /**
+ * Register all class from the list of classes.
+ *
+ * @param kryo Kryo instance
+ * @param types List of classes
+ */
+ private static void registerAll(Kryo kryo, ArrayList<Class<?>> types) {
+ for (Class<?> type : types) {
+ register(kryo, type);
+ }
+ }
+
+ /**
+ * Returns list of all classes that are generated by using given
+ * pattern, and replacing it with passed list of types.
+ * Pattern contains _t1_ and _T1_, for lowercase and actual name.
+ *
+ * @param pattern Given pattern
+ * @param types Given list of strings to replace into pattern
+ * @return List of all classes
+ */
+ private static ArrayList<Class<?>> singleTypes(
+ String pattern, String[] types) {
+ ArrayList<Class<?>> result = new ArrayList<>();
+
+ for (String type : types) {
+ try {
+ result.add(Class.forName(
+ pattern.replaceAll("_T1_", type).replaceAll(
+ "_t1_", type.toLowerCase())));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(pattern + " " + type, e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Returns list of all classes that are generated by using given
+ * pattern, and replacing it with passed list of types.
+ * Pattern contains two variable pairs: _t1_, _T1_ and _t2_, _T2_,
+ * in each pair one for lowercase and one for actual name.
+ *
+ * @param pattern Given pattern
+ * @param types1 Given list of strings to replace t1 into pattern
+ * @param types2 Given list of strings to replace t2 into pattern
+ * @return List of all classes
+ */
+ private static ArrayList<Class<?>> doubleTypes(
+ String pattern, String[] types1, String[] types2) {
+ ArrayList<Class<?>> result = new ArrayList<>();
+
+ for (String type1 : types1) {
+ for (String type2 : types2) {
+ try {
+ result.add(Class.forName(
+ pattern.replaceAll("_T1_", type1).replaceAll(
+ "_t1_", type1.toLowerCase())
+ .replaceAll("_T2_", type2).replaceAll(
+ "_t2_", type2.toLowerCase())));
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(pattern + " " + type1 + " " + type2, e);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, T object) {
+ fieldSerializer.write(kryo, output, object);
+
+ outputWrapper.set(output, kryo);
+ try {
+ writeMethod.invoke(object, outputWrapper);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException("writeObject failed", e);
+ }
+ }
+
+ @Override
+ public T read(Kryo kryo, Input input, Class<T> type) {
+ T result = fieldSerializer.read(kryo, input, type);
+
+ if (result != null) {
+ inputWrapper.set(input, kryo);
+ try {
+ readMethod.invoke(result, inputWrapper);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException("readObject failed", e);
+ }
+ }
+
+ return result;
+ }
+
+ /**
+ * Wrapper around ObjectOutputStream that ignores defaultWriteObject (assumes
+ * that needed logic was already executed before), and passes all other calls
+ * to Output
+ */
+ private static class FastutilKryoObjectOutputStream
+ extends ObjectOutputStream {
+ /** Output */
+ private Output output;
+ /** Kryo */
+ private Kryo kryo;
+
+ /** Constructor */
+ FastutilKryoObjectOutputStream() throws IOException {
+ super();
+ }
+
+ /**
+ * Setter
+ *
+ * @param output Output
+ * @param kryo kryo
+ */
+ public void set(Output output, Kryo kryo) {
+ this.output = output;
+ this.kryo = kryo;
+ }
+
+ @Override
+ public void defaultWriteObject() throws IOException {
+ }
+
+ @Override
+ public void writeBoolean(boolean val) throws IOException {
+ output.writeBoolean(val);
+ }
+
+ @Override
+ public void writeByte(int val) throws IOException {
+ output.writeByte(val);
+ }
+
+ @Override
+ public void writeShort(int val) throws IOException {
+ output.writeShort(val);
+ }
+
+ @Override
+ public void writeChar(int val) throws IOException {
+ output.writeChar((char) val);
+ }
+
+ @Override
+ public void writeInt(int val) throws IOException {
+ output.writeInt(val, false);
+ }
+
+ @Override
+ public void writeLong(long val) throws IOException {
+ output.writeLong(val, false);
+ }
+
+ @Override
+ public void writeFloat(float val) throws IOException {
+ output.writeFloat(val);
+ }
+
+ @Override
+ public void writeDouble(double val) throws IOException {
+ output.writeDouble(val);
+ }
+
+ @Override
+ protected void writeObjectOverride(Object obj) throws IOException {
+ kryo.writeClassAndObject(output, obj);
+ }
+ }
+
+ /**
+ * Wrapper around ObjectOutputStream that ignores defaultReadObject
+ * (assumes that needed logic was already executed before), and passes
+ * all other calls to Output
+ */
+ private static class FastutilKryoObjectInputStream extends ObjectInputStream {
+ /** Input */
+ private Input input;
+ /** Kryo */
+ private Kryo kryo;
+
+ /** Constructor */
+ FastutilKryoObjectInputStream() throws IOException {
+ super();
+ }
+
+ /**
+ * Setter
+ *
+ * @param input Input
+ * @param kryo Kryo
+ */
+ public void set(Input input, Kryo kryo) {
+ this.input = input;
+ this.kryo = kryo;
+ }
+
+ @Override
+ public void defaultReadObject() throws IOException, ClassNotFoundException {
+ }
+
+ @Override
+ public boolean readBoolean() throws IOException {
+ return input.readBoolean();
+ }
+
+ @Override
+ public byte readByte() throws IOException {
+ return input.readByte();
+ }
+
+ @Override
+ public char readChar() throws IOException {
+ return input.readChar();
+ }
+
+ @Override
+ public short readShort() throws IOException {
+ return input.readShort();
+ }
+
+ @Override
+ public int readInt() throws IOException {
+ return input.readInt(false);
+ }
+
+ @Override
+ public long readLong() throws IOException {
+ return input.readLong(false);
+ }
+
+ @Override
+ public float readFloat() throws IOException {
+ return input.readFloat();
+ }
+
+ @Override
+ public double readDouble() throws IOException {
+ return input.readDouble();
+ }
+
+ @Override
+ protected Object readObjectOverride()
+ throws IOException, ClassNotFoundException {
+ return kryo.readClassAndObject(input);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
new file mode 100644
index 0000000..f11139d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/ReusableFieldSerializer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo.serializers;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+/**
+ * Serializer used to deserialize data into object, instead of creating a new
+ * value readIntoObject needs to be set right before deserialization is called
+ * on it.
+ *
+ * @param <T> Object type
+ */
+public class ReusableFieldSerializer<T> extends FieldSerializer<T> {
+ /** Current object into which to deserialize */
+ private T readIntoObject;
+
+ /**
+ * Creates new reusable field serializer for a given type.
+ * @param kryo HadoopKryo object
+ * @param type Type of object
+ */
+ public ReusableFieldSerializer(Kryo kryo, Class type) {
+ super(kryo, type);
+ }
+
+ public void setReadIntoObject(T value) {
+ this.readIntoObject = value;
+ }
+
+ @Override
+ protected T create(Kryo kryo, Input input, Class<T> type) {
+ Preconditions.checkNotNull(readIntoObject);
+ Preconditions.checkState(readIntoObject.getClass().equals(type));
+ T toReturn = readIntoObject;
+ readIntoObject = null;
+ return toReturn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
new file mode 100644
index 0000000..9eabf96
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/serializers/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Implementations of custom serializers needed for HadoopKryo
+ */
+package org.apache.giraph.writable.kryo.serializers;
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
new file mode 100644
index 0000000..3898b82
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import static org.junit.Assert.assertEquals;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.giraph.types.ops.collections.BasicArrayList.BasicLongArrayList;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+
+/**
+ * Tests some subtle cases of kryo serialization.
+ */
+public class KryoWritableTest {
+ public static class TestClassA extends KryoWritable {
+ final String testObject;
+ final List list;
+ final int something;
+
+ public TestClassA(String testObject, List list, int something) {
+ this.testObject = testObject;
+ this.list = list;
+ this.something = something;
+ }
+
+ public TestClassA() {
+ this.testObject = null;
+ this.list = null;
+ this.something = -1;
+ }
+ }
+
+ @Test
+ public void testTestClassA() throws Exception {
+ String testObject = "Hello World!";
+ TestClassA res = new TestClassA();
+ WritableUtils.copyInto(
+ new TestClassA(testObject, Arrays.asList(1, 2, 3), 5), res, true);
+
+ assertEquals(testObject, res.testObject);
+
+ assertEquals(3, res.list.size());
+ assertEquals(1, res.list.get(0));
+ assertEquals(2, res.list.get(1));
+ assertEquals(3, res.list.get(2));
+
+ assertEquals(5, res.something);
+ }
+
+ public static class LongKryoWritable extends KryoWritable {
+ private long value;
+
+ public LongKryoWritable(long value) {
+ this.value = value;
+ }
+
+ public long get() {
+ return value;
+ }
+
+ public void set(long value) {
+ this.value = value;
+ }
+ }
+
+
+ int multiplier = 5000; // use 5000 for profiling
+ int longTestTimes = 1000 * multiplier;
+
+ @Test
+ public void testLongKryoWritable() throws Exception {
+ LongKryoWritable from = new LongKryoWritable(0);
+ LongKryoWritable to = new LongKryoWritable(0);
+
+ for (int i = 0; i < longTestTimes; i++) {
+ from.set(i);
+ WritableUtils.copyInto(from, to, true);
+ assertEquals(i, to.get());
+ }
+ }
+
+ @Test
+ public void testLongWritable() throws Exception {
+ LongWritable from = new LongWritable(0);
+ LongWritable to = new LongWritable(0);
+
+ for (int i = 0; i < longTestTimes; i++) {
+ from.set(i);
+ WritableUtils.copyInto(from, to, true);
+ assertEquals(i, to.get());
+ }
+ }
+
+ public static class LongListKryoWritable extends KryoWritable {
+ public LongArrayList value;
+
+ public LongListKryoWritable(LongArrayList value) {
+ this.value = value;
+ }
+ }
+
+ int longListTestTimes = 1 * multiplier;
+ int longListTestSize = 100000;
+
+ @Test
+ public void testLongListKryoWritable() throws Exception {
+ LongArrayList list = new LongArrayList(longListTestSize);
+ for (int i = 0; i < longListTestSize; i++) {
+ list.add(i);
+ }
+
+ LongListKryoWritable from = new LongListKryoWritable(list);
+ LongListKryoWritable to = new LongListKryoWritable(null);
+
+ for (int i = 0; i < longListTestTimes; i++) {
+ from.value.set((2 * i) % longListTestSize, 0);
+ WritableUtils.copyInto(from, to, true);
+ }
+ }
+
+ @Test
+ public void testLongListWritable() throws Exception {
+ BasicLongArrayList from = new BasicLongArrayList(longListTestSize);
+ LongWritable value = new LongWritable();
+ for (int i = 0; i < longListTestSize; i++) {
+ value.set(i);
+ from.add(value);
+ }
+
+ BasicLongArrayList to = new BasicLongArrayList(longListTestSize);
+ value.set(0);
+
+ for (int i = 0; i < longListTestTimes; i++) {
+ from.set((2 * i) % longListTestSize, value);
+ WritableUtils.copyInto(from, to, true);
+ }
+ }
+
+ public static class NestedKryoWritable<T> extends KryoWritable {
+ public LongKryoWritable value1;
+ public T value2;
+
+ public NestedKryoWritable(LongKryoWritable value1, T value2) {
+ this.value1 = value1;
+ this.value2 = value2;
+ }
+ }
+
+ @Test
+ public void testNestedKryoWritable() throws Exception {
+ LongKryoWritable inner = new LongKryoWritable(5);
+ NestedKryoWritable<LongKryoWritable> res = new NestedKryoWritable<>(null, null);
+ WritableUtils.copyInto(
+ new NestedKryoWritable<>(inner, inner), res, true);
+
+ assertEquals(5, res.value1.get());
+ Assert.assertTrue(res.value1 == res.value2);
+ }
+
+ @Test
+ public void testRecursiveKryoWritable() throws Exception {
+ LongKryoWritable inner = new LongKryoWritable(5);
+ NestedKryoWritable wanted = new NestedKryoWritable<>(inner, null);
+ wanted.value2 = wanted;
+
+ NestedKryoWritable res = new NestedKryoWritable<>(null, null);
+ WritableUtils.copyInto(wanted, res, true);
+
+ assertEquals(5, res.value1.get());
+ Assert.assertTrue(res == res.value2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/06a1084a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
new file mode 100644
index 0000000..2291f16
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/writable/kryo/KryoWritableWrapperTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import it.unimi.dsi.fastutil.chars.Char2ObjectMap;
+import it.unimi.dsi.fastutil.chars.Char2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.Int2BooleanMap;
+import it.unimi.dsi.fastutil.ints.Int2BooleanOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConfigurationSettable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.writable.kryo.markers.NonKryoWritable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Iterators;
+
+
+
+/**
+ * Tests some subtle cases of kryo serialization.
+ */
+public class KryoWritableWrapperTest {
+ public static <T> T kryoSerDeser(T t) throws IOException {
+ KryoWritableWrapper<T> wrapped = new KryoWritableWrapper<>(t);
+ KryoWritableWrapper<T> deser = new KryoWritableWrapper<>();
+ WritableUtils.copyInto(wrapped, deser, true);
+ return deser.get();
+ }
+
+ @Test
+ public void testArraysAsList() throws IOException {
+ List res = kryoSerDeser(Arrays.asList(1, 2, 3));
+
+ assertEquals(3, res.size());
+ assertEquals(1, res.get(0));
+ assertEquals(2, res.get(1));
+ assertEquals(3, res.get(2));
+ }
+
+
+ @Test
+ public void testArraysAsListMultiRef() throws IOException {
+ List list = Arrays.asList(1, 2, 3);
+ Object obj = new Object();
+ List wanted = Arrays.asList(list, list, obj, obj, null);
+ wanted.set(4, wanted);
+ List res = kryoSerDeser(wanted);
+
+ assertTrue(res.get(0) == res.get(1));
+ assertTrue(res.get(2) == res.get(3));
+ // TODO see if this can be supported, though this is a rare case:
+ // assertTrue(res == res.get(4));
+ }
+
+ @Test
+ public void testCollectionsNCopiesList() throws IOException {
+ List res = kryoSerDeser(Collections.nCopies(3, 42));
+
+ assertEquals(3, res.size());
+ assertEquals(42, res.get(0));
+ assertEquals(42, res.get(1));
+ assertEquals(42, res.get(2));
+ }
+
+ @Test
+ public void testCollectionsNCopiesObjectList() throws IOException {
+ String testObject = "Hello World!";
+ List<String> res = kryoSerDeser(Collections.nCopies(3, testObject));
+
+ assertEquals(3, res.size());
+ assertEquals(testObject, res.get(0));
+ assertEquals(testObject, res.get(1));
+ assertEquals(testObject, res.get(2));
+ }
+
+ @Test
+ public void testUnmodifiableIterator() throws IOException {
+ Iterator<Integer> in = Iterables.concat(
+ Arrays.asList(0, 1),
+ Arrays.asList(2, 3),
+ Arrays.asList(4, 5)).iterator();
+
+ in.next();
+ in.next();
+ in.next();
+ Iterator res = kryoSerDeser(in);
+
+ int cnt = 3;
+ for(; res.hasNext(); cnt++) {
+ assertEquals(cnt, res.next());
+ }
+ assertEquals(6, cnt);
+ }
+
+ @Test
+ public void testIteratorsConcat() throws IOException {
+ Iterator<Integer> in = Iterators.concat(
+ Arrays.asList(0, 1).iterator(),
+ Arrays.asList(2, 3).iterator(),
+ Arrays.asList(4, 5).iterator());
+
+ in.next();
+ in.next();
+ in.next();
+
+ Iterator res = kryoSerDeser(in);
+
+ int cnt = 3;
+ for(; res.hasNext(); cnt++) {
+ assertEquals(cnt, res.next());
+ }
+ assertEquals(6, cnt);
+
+ }
+
+ @Test
+ public void testImmutableList() throws IOException {
+ {
+ List res = kryoSerDeser(ImmutableList.of(1, 2));
+ assertEquals(2, res.size());
+ assertEquals(1, res.get(0));
+ assertEquals(2, res.get(1));
+ }
+
+ {
+ List list = ImmutableList.of(1, 2, 3);
+ Object obj = new Object();
+ List wanted = ImmutableList.of(list, list, obj, obj);
+ List res = kryoSerDeser(wanted);
+
+ assertTrue(res.get(0) == res.get(1));
+ assertTrue(res.get(2) == res.get(3));
+ }
+ }
+
+ @Test
+ public void testFastutilSet() throws ClassNotFoundException, IOException {
+ LongOpenHashSet set = new LongOpenHashSet();
+ set.add(6);
+ LongOpenHashSet deser = kryoSerDeser(set);
+ deser.add(5);
+ set.add(5);
+ Assert.assertEquals(set, deser);
+ }
+
+ @Test
+ public void testFastutilLongList() throws ClassNotFoundException, IOException {
+ LongArrayList list = new LongArrayList();
+ list.add(6);
+ LongArrayList deser = kryoSerDeser(list);
+ deser.add(5);
+ list.add(5);
+ Assert.assertEquals(list, deser);
+ }
+
+ @Test
+ public void testFastutilFloatList() throws ClassNotFoundException, IOException {
+ FloatArrayList list = new FloatArrayList();
+ list.add(6L);
+ FloatArrayList deser = kryoSerDeser(list);
+ deser.add(5L);
+ list.add(5L);
+ Assert.assertEquals(list, deser);
+ }
+
+ @Test
+ public void testFastutilMap() throws ClassNotFoundException, IOException {
+ Int2BooleanMap list = new Int2BooleanOpenHashMap();
+ list.put(6, true);
+ Int2BooleanMap deser = kryoSerDeser(list);
+ deser.put(5, false);
+ list.put(5, false);
+ Assert.assertEquals(list, deser);
+ }
+
+ @Test
+ public void testFastutil2ObjMap() throws ClassNotFoundException, IOException {
+ Char2ObjectMap<IntWritable> list = new Char2ObjectOpenHashMap<>();
+ list.put('a', new IntWritable(6));
+ list.put('q', new IntWritable(7));
+ list.put('w', new IntWritable(8));
+ list.put('e', new IntWritable(9));
+ list.put('r', new IntWritable(7));
+ list.put('c', null);
+ Char2ObjectMap<IntWritable> deser = kryoSerDeser(list);
+ deser.put('b', null);
+ list.put('b', null);
+
+ Assert.assertEquals(list, deser);
+ }
+
+ @Test
+ @Ignore("Long test used for profiling compiling speed")
+ public void testLongFastutilListProfile() throws ClassNotFoundException, IOException {
+ int n = 100;
+ int rounds = 2000000;
+ LongArrayList list = new LongArrayList(n);
+ for (int i = 0; i < n; i++) {
+ list.add(i);
+ }
+
+ for (int round = 0; round < rounds; round ++) {
+ LongArrayList deser = kryoSerDeser(list);
+ deser.add(round);
+ list.add(round);
+ Assert.assertEquals(list.size(), deser.size());
+ Assert.assertArrayEquals(list.elements(), deser.elements());
+
+ list.popLong();
+ }
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testRandom() throws ClassNotFoundException, IOException {
+ kryoSerDeser(new Random()).nextBoolean();
+ }
+
+ private static class TestConf implements GiraphConfigurationSettable {
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration configuration) {
+ }
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testConfiguration() throws ClassNotFoundException, IOException {
+ kryoSerDeser(new Configuration());
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testConfigurable() throws ClassNotFoundException, IOException {
+ kryoSerDeser(new TestConf());
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testVertexReceiver() throws ClassNotFoundException, IOException {
+ kryoSerDeser(new NonKryoWritable() {
+ });
+ }
+
+
+ @Test
+ public void testBlacklistedClasses() throws ClassNotFoundException, IOException {
+ Assert.assertEquals(kryoSerDeser(Random.class), Random.class);
+ Assert.assertEquals(kryoSerDeser(TestConf.class), TestConf.class);
+ Assert.assertEquals(kryoSerDeser(GiraphConfiguration.class), GiraphConfiguration.class);
+ }
+
+ @Test(expected=RuntimeException.class)
+ public void testRecursive() throws ClassNotFoundException, IOException {
+ kryoSerDeser(new KryoWritableWrapper<>(new Object())).get().hashCode();
+ }
+
+ @Test
+ public void testNull() throws ClassNotFoundException, IOException {
+ Assert.assertNull(kryoSerDeser(null));
+ }
+}