You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by di...@apache.org on 2018/05/15 05:45:57 UTC
git commit: updated refs/heads/trunk to b2d7741
Repository: giraph
Updated Branches:
refs/heads/trunk 345f3db49 -> b2d77411a
GIRAPH-1188
closes #70
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b2d77411
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b2d77411
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b2d77411
Branch: refs/heads/trunk
Commit: b2d77411ae4a43ddd2386c4a1db5e0191790acf6
Parents: 345f3db
Author: Yuksel Akinci <yu...@fb.com>
Authored: Mon May 14 22:45:14 2018 -0700
Committer: Dionysios Logothetis <di...@fb.com>
Committed: Mon May 14 22:45:14 2018 -0700
----------------------------------------------------------------------
.../no_vtx/MessagesWithoutVerticesTest.java | 17 +
.../java/org/apache/giraph/bsp/BspService.java | 14 +-
.../org/apache/giraph/conf/GiraphConstants.java | 10 +
.../org/apache/giraph/writable/kryo/Boxed.java | 36 ++
.../writable/kryo/GiraphClassResolver.java | 371 +++++++++++++++++
.../apache/giraph/writable/kryo/HadoopKryo.java | 15 +-
.../giraph/writable/kryo/KryoSimpleWrapper.java | 2 +-
.../java/org/apache/giraph/zk/ZooKeeperExt.java | 4 +-
.../PageRankWithKryoSimpleWritable.java | 393 +++++++++++++++++++
.../giraph/examples/TestKryoPageRank.java | 98 +++++
10 files changed, 951 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java
index bf3e194..dcdb002 100644
--- a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.giraph.block_app.framework.no_vtx;
import java.util.Iterator;
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index c3fd141..632a1e6 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -25,6 +25,7 @@ import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.writable.kryo.GiraphClassResolver;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.giraph.zk.ZooKeeperExt;
@@ -81,7 +82,9 @@ public abstract class BspService<I extends WritableComparable,
/** Input splits all done node*/
public static final String INPUT_SPLITS_ALL_DONE_NODE =
"/_inputSplitsAllDone";
-
+ /** Directory to store kryo className-ID assignment */
+ public static final String KRYO_REGISTERED_CLASS_DIR =
+ "/_kryo";
/** Directory of attempts of this application */
public static final String APPLICATION_ATTEMPTS_DIR =
"/_applicationAttemptsDir";
@@ -155,6 +158,8 @@ public abstract class BspService<I extends WritableComparable,
protected final String haltComputationPath;
/** Path where memory observer stores data */
protected final String memoryObserverPath;
+ /** Kryo className-ID mapping directory */
+ protected final String kryoRegisteredClassPath;
/** Private ZooKeeper instance that implements the service */
private final ZooKeeperExt zk;
/** Has the Connection occurred? */
@@ -250,7 +255,7 @@ public abstract class BspService<I extends WritableComparable,
inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE;
applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR;
cleanedUpPath = basePath + CLEANED_UP_DIR;
-
+ kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR;
String restartJobId = RESTART_JOB_ID.get(conf);
@@ -289,6 +294,11 @@ public abstract class BspService<I extends WritableComparable,
throw new RuntimeException(e);
}
+ boolean disableGiraphResolver =
+ GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf);
+ if (!disableGiraphResolver) {
+ GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath);
+ }
this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() +
conf.getTaskPartition();
this.hostnameTaskId = hostname + "_" + getTaskId();
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index db13670..4c02fff 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -1292,5 +1292,15 @@ public interface GiraphConstants {
/** Number of supersteps job will run for */
IntConfOption SUPERSTEP_COUNT = new IntConfOption("giraph.numSupersteps", -1,
"Number of supersteps job will run for");
+
+ /** Whether to disable GiraphClassResolver which is an efficient
+ * implementation of kryo class resolver. By default this resolver is used by
+ * KryoSimpleWritable and KryoSimpleWrapper, and can be disabled with this
+ * option */
+ BooleanConfOption DISABLE_GIRAPH_CLASS_RESOLVER =
+ new BooleanConfOption("giraph.disableGiraphClassResolver", false,
+ "Disables GiraphClassResolver, which is a custom implementation " +
+ "of kryo class resolver that avoids writing class names to the " +
+ "underlying stream for faster serialization.");
}
// CHECKSTYLE: resume InterfaceIsTypeCheck
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java
new file mode 100644
index 0000000..087f0dd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+
+/**
+ * Boxed interface
+ * @param <T>
+ */
+public interface Boxed<T> {
+ /**
+ * Gets the boxed value.
+ * @return Boxed object.
+ */
+ T get();
+
+ /**
+ * Sets the boxed value.
+ * @param value Value
+ */
+ void set(T value);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java
new file mode 100644
index 0000000..80e7011
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.writable.kryo;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Registration;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.util.DefaultClassResolver;
+import com.esotericsoftware.kryo.util.ObjectMap;
+import org.apache.giraph.zk.ZooKeeperExt;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+
+import static com.esotericsoftware.kryo.util.Util.getWrapperClass;
+
+/**
+ * In order to avoid writing class names to the stream, this class resolver
+ * assigns unique integers to each class name and writes/reads those integers
+ * to/from the stream. Reads assume that there is already a class assigned
+ * to the given integer. This resolver only assigns unique integers for
+ * classes that are not explicitly registered since those classes are already
+ * assigned unique integers at the time of registration. This implementation
+ * uses zookeeper to provide consistent class name to ID mapping across all
+ + nodes.
+ *
+ *
+ * If resolver encounters a class name that has not been assigned to a unique
+ * integer yet, it creates a class node in zookeeper under a designated path
+ * with persistent_sequential mode - allowing the file name of the class node
+ * to be suffixed with an auto incremented integer. After the class node is
+ * created, the resolver reads back all the nodes under the designated path
+ * and uses the unique suffix as the class id. If there are duplicate entries
+ * for the same class name due to some race condition, the lowest suffix is
+ * used.
+ */
+public class GiraphClassResolver extends DefaultClassResolver {
+ /** Base ID to start for class name assignments.
+ * This number has to be high enough to not conflict with
+ * explicity registered class IDs.
+ * */
+ private static final int BASE_CLASS_ID = 1000;
+
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(GiraphClassResolver.class);
+
+ /** Class name to ID cache */
+ private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap();
+ /** ID to class name cache */
+ private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap();
+ /** Zookeeper */
+ private static ZooKeeperExt ZK;
+ /** Zookeeper path for automatic class registrations */
+ private static String KRYO_REGISTERED_CLASS_PATH;
+ /** Minimum class ID assigned by zookeeper sequencing */
+ private static int MIN_CLASS_ID = -1;
+ /** True if the zookeeper class registration path is already created */
+ private static boolean IS_CLASS_PATH_CREATED = false;
+
+ /** Memoized class id*/
+ private int memoizedClassId = -1;
+ /** Memoized class registration */
+ private Registration memoizedClassIdValue;
+
+ /**
+ * Sets zookeeper informaton.
+ * @param zookeeperExt ZookeeperExt
+ * @param kryoClassPath Zookeeper directory path where class Name-ID
+ * mapping is stored.
+ */
+ public static void setZookeeperInfo(ZooKeeperExt zookeeperExt,
+ String kryoClassPath) {
+ ZK = zookeeperExt;
+ KRYO_REGISTERED_CLASS_PATH = kryoClassPath;
+ }
+
+ /**
+ * Return true of the zookeeper is initialized.
+ * @return True if the zookeeper is initialized.
+ */
+ public static boolean isInitialized() {
+ return ZK != null;
+ }
+
+ /**
+ * Creates a new node for the given class name.
+ * Creation mode is persistent sequential, i.e.
+ * ZK will always create a new node . There could be
+ * multiple entries for the same class name but since
+ * the lowest index is used, this is not a problem.
+ * @param className Class name
+ */
+ public static void createClassName(String className) {
+ try {
+ String path = KRYO_REGISTERED_CLASS_PATH + "/" + className;
+ ZK.createExt(path,
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT_SEQUENTIAL,
+ true);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "Failed to create class " + className, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "Interrupted while creating " + className, e);
+ }
+ }
+
+ /**
+ * Refreshes class-ID mapping from zookeeper.
+ * Not thread safe.
+ */
+ public static void refreshCache() {
+ if (!IS_CLASS_PATH_CREATED) {
+ try {
+ ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH,
+ null,
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT,
+ true);
+ IS_CLASS_PATH_CREATED = true;
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "Failed to refresh kryo cache " +
+ KRYO_REGISTERED_CLASS_PATH, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "Interrupted while refreshing kryo cache " +
+ KRYO_REGISTERED_CLASS_PATH, e);
+ }
+ }
+
+ List<String> registeredList;
+ try {
+ registeredList =
+ ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH,
+ false,
+ true,
+ false);
+ } catch (KeeperException e) {
+ throw new IllegalStateException(
+ "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "Interrupted while retrieving child nodes for " +
+ KRYO_REGISTERED_CLASS_PATH, e);
+ }
+
+ for (String name : registeredList) {
+ // Since these files are created with PERSISTENT_SEQUENTIAL mode,
+ // Kryo appends a sequential number to their file name.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registered class: " + name);
+ }
+ String className = name.substring(0,
+ name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH);
+ int classId = Integer.parseInt(
+ name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH));
+
+ if (MIN_CLASS_ID == -1) {
+ MIN_CLASS_ID = classId;
+ }
+
+ int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID;
+ if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) {
+ ID_TO_CLASS_NAME.put(adjustedId, className);
+ }
+ }
+ }
+
+ /**
+ * Gets ID for the given class name.
+ * @param className Class name
+ * @return class id Class ID
+ */
+ public static int getClassId(String className) {
+ if (CLASS_NAME_TO_ID.containsKey(className)) {
+ return CLASS_NAME_TO_ID.get(className);
+ }
+ synchronized (GiraphClassResolver.class) {
+ if (CLASS_NAME_TO_ID.containsKey(className)) {
+ return CLASS_NAME_TO_ID.get(className);
+ }
+ refreshCache();
+
+ if (!CLASS_NAME_TO_ID.containsKey(className)) {
+ createClassName(className);
+ refreshCache();
+ }
+ }
+
+ if (!CLASS_NAME_TO_ID.containsKey(className)) {
+ throw new IllegalStateException("Failed to assigned id to " + className);
+ }
+
+ return CLASS_NAME_TO_ID.get(className);
+ }
+
+ /**
+ * Get class name for given ID.
+ * @param id class ID
+ * @return class name
+ */
+ public static String getClassName(int id) {
+ if (ID_TO_CLASS_NAME.containsKey(id)) {
+ return ID_TO_CLASS_NAME.get(id);
+ }
+ synchronized (GiraphClassResolver.class) {
+ if (ID_TO_CLASS_NAME.containsKey(id)) {
+ return ID_TO_CLASS_NAME.get(id);
+ }
+ refreshCache();
+ }
+
+ if (!ID_TO_CLASS_NAME.containsKey(id)) {
+ throw new IllegalStateException("ID " + id + " doesn't exist");
+ }
+ return ID_TO_CLASS_NAME.get(id);
+ }
+
+ @Override
+ public Registration register(Registration registration) {
+ if (registration == null) {
+ throw new IllegalArgumentException("registration cannot be null");
+ }
+ if (registration.getId() == NAME) {
+ throw new IllegalArgumentException("Invalid registration ID");
+ }
+
+ idToRegistration.put(registration.getId(), registration);
+ classToRegistration.put(registration.getType(), registration);
+ if (registration.getType().isPrimitive()) {
+ classToRegistration.put(getWrapperClass(registration.getType()),
+ registration);
+ }
+ return registration;
+ }
+
+ @Override
+ public Registration registerImplicit(Class type) {
+ return register(new Registration(type, kryo.getDefaultSerializer(type),
+ getClassId(type.getName())));
+ }
+
+ @Override
+ public Registration writeClass(Output output, Class type) {
+ if (type == null) {
+ output.writeVarInt(Kryo.NULL, true);
+ return null;
+ }
+
+ Registration registration = kryo.getRegistration(type);
+ if (registration.getId() == NAME) {
+ throw new IllegalStateException("Invalid registration ID");
+ } else {
+ // Class ID's are incremented by 2 when writing, because 0 is used
+ // for null and 1 is used for non-explicitly registered classes.
+ output.writeVarInt(registration.getId() + 2, true);
+ }
+ return registration;
+ }
+
+ @Override
+ public Registration readClass(Input input) {
+ int classID = input.readVarInt(true);
+ if (classID == Kryo.NULL) {
+ return null;
+ } else if (classID == NAME + 2) {
+ throw new IllegalStateException("Invalid class ID");
+ }
+ if (classID == memoizedClassId) {
+ return memoizedClassIdValue;
+ }
+ Registration registration = idToRegistration.get(classID - 2);
+ if (registration == null) {
+ String className = getClassName(classID - 2);
+ Class type = getTypeByName(className);
+ if (type == null) {
+ try {
+ type = Class.forName(className, false, kryo.getClassLoader());
+ } catch (ClassNotFoundException ex) {
+ throw new KryoException("Unable to find class: " + className, ex);
+ }
+ if (nameToClass == null) {
+ nameToClass = new ObjectMap();
+ }
+ nameToClass.put(className, type);
+ }
+ registration = new Registration(type, kryo.getDefaultSerializer(type),
+ classID - 2);
+ register(registration);
+ }
+ memoizedClassId = classID;
+ memoizedClassIdValue = registration;
+ return registration;
+ }
+
+ /**
+ * Reset the internal state
+ * Reset clears two hash tables:
+ * 1 - Class name to ID: Every non-explicitly registered class takes the
+ * ID agreed by all kryo instances, and it doesn't change across
+ * serializations, so this reset is not required.
+ * 2- Reference tracking: Not required because it is disabled.
+ *
+ * Therefore, this method should not be invoked.
+ *
+ */
+ public void reset() {
+ throw new IllegalStateException("Not implemented");
+ }
+
+ /**
+ * This method writes the class name for the first encountered
+ * non-explicitly registered class. Since all non-explicitly registered
+ * classes take the ID agreed by all kryo instances, there is no need
+ * to write the class name, so this method should not be invoked.
+ * @param output Output stream
+ * @param type CLass type
+ * @param registration Registration
+ */
+ @Override
+ protected void writeName(Output output, Class type,
+ Registration registration) {
+ throw new IllegalStateException("Not implemented");
+ }
+
+ /**
+ * This method reads the class name for the first encountered
+ * non-explicitly registered class. Since all non-explicitly registered
+ * classes take the ID agreed by all kryo instances, class name is
+ * never written, so this method should not be invoked.
+ * @param input Input stream
+ * @return Registration
+ */
+ @Override
+ protected Registration readName(Input input) {
+ throw new IllegalStateException("Not implemented");
+ }
+
+ /**
+ * Get type by class name.
+ * @param className Class name
+ * @return class type
+ */
+ protected Class<?> getTypeByName(final String className) {
+ return nameToClass != null ? nameToClass.get(className) : null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
index fb1186b..2713316 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
@@ -308,8 +308,12 @@ public class HadoopKryo extends Kryo {
if (trackReferences) {
kryo = new HadoopKryo();
} else {
- // TODO: if trackReferences is false use custom class resolver.
- kryo = new HadoopKryo(new DefaultClassResolver(),
+ // Only use GiraphClassResolver if it is properly initialized.
+ // This is to enable test cases which use KryoSimpleWrapper
+ // but don't start ZK.
+ kryo = new HadoopKryo(
+ GiraphClassResolver.isInitialized() ? new GiraphClassResolver() :
+ new DefaultClassResolver(),
new MapReferenceResolver());
}
@@ -406,8 +410,11 @@ public class HadoopKryo extends Kryo {
if (!trackReferences) {
kryo.setReferences(false);
- // TODO: Enable the following when a custom class resolver is created.
- // kryo.setAutoReset(false);
+ // Auto reset can only be disabled if the GiraphClassResolver is
+ // properly initialized.
+ if (GiraphClassResolver.isInitialized()) {
+ kryo.setAutoReset(false);
+ }
}
return kryo;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
index 9c5de74..3cb291d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.io.Writable;
*
* @param <T> Object type
*/
-public class KryoSimpleWrapper<T> implements Writable {
+public class KryoSimpleWrapper<T> implements Writable, Boxed<T> {
/** Wrapped object */
private T object;
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
index 1eb4c8b..a20c494 100644
--- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
+++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java
@@ -42,10 +42,10 @@ import org.apache.zookeeper.ZooKeeper;
* should be thread-safe.
*/
public class ZooKeeperExt {
+ /** Length of the ZK sequence number */
+ public static final int SEQUENCE_NUMBER_LENGTH = 10;
/** Internal logger */
private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
- /** Length of the ZK sequence number */
- private static final int SEQUENCE_NUMBER_LENGTH = 10;
/** Internal ZooKeeper */
private final ZooKeeper zooKeeper;
/** Ensure we have progress */
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java
new file mode 100644
index 0000000..4e9e1ed
--- /dev/null
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.aggregators.BasicAggregator;
+import org.apache.giraph.aggregators.LongSumAggregator;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.VertexReader;
+import org.apache.giraph.io.formats.GeneratedVertexInputFormat;
+import org.apache.giraph.master.DefaultMasterCompute;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.giraph.writable.kryo.KryoSimpleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue;
+import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue;
+
+/**
+ * Copy of SimplePageRank, modified to test vertex/edge and
+ * message values that derives from KryoSimpleWritable.
+ */
+@Algorithm(
+ name = "Page rank"
+)
+public class PageRankWithKryoSimpleWritable extends
+ BasicComputation<LongWritable, VertexValue,
+ EdgeValue, MessageValue> {
+ /** Number of supersteps for this test */
+ public static final int MAX_SUPERSTEPS = 30;
+ /** Number of supersteps for this static 3;
+ /** Logger */
+ private static final Logger LOG =
+ Logger.getLogger(PageRankWithKryoSimpleWritable.class);
+ /** Sum aggregator name */
+ private static String SUM_AGG = "sum";
+ /** Min aggregator name */
+ private static String MIN_AGG = "min";
+ /** Max aggregator name */
+ private static String MAX_AGG = "max";
+
+ @Override
+ public void compute(
+ Vertex<LongWritable, VertexValue,
+ EdgeValue> vertex,
+ Iterable<MessageValue> messages) throws IOException {
+ if (getSuperstep() >= 1) {
+ double sum = 0;
+ for (MessageValue message : messages) {
+ sum += message.get();
+ }
+ Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum;
+ VertexValue vertexValue = new VertexValue(value);
+ vertex.setValue(vertexValue);
+ aggregate(MAX_AGG, vertexValue);
+ aggregate(MIN_AGG, vertexValue);
+ aggregate(SUM_AGG, new LongWritable(1));
+ LOG.info(vertex.getId() + ": PageRank=" + vertexValue +
+ " max=" + getAggregatedValue(MAX_AGG) +
+ " min=" + getAggregatedValue(MIN_AGG));
+ }
+
+ if (getSuperstep() < MAX_SUPERSTEPS) {
+ long edges = vertex.getNumEdges();
+ sendMessageToAllEdges(vertex,
+ new MessageValue(vertex.getValue().get() / edges));
+ } else {
+ vertex.voteToHalt();
+ }
+ }
+
+ /**
+ * Worker context used with {@link PageRankWithKryoSimpleWritable}.
+ */
+ public static class PageRankWithKryoWorkerContext extends
+ WorkerContext {
+ /** Final max value for verification for local jobs */
+ private static double FINAL_MAX;
+ /** Final min value for verification for local jobs */
+ private static double FINAL_MIN;
+ /** Final sum value for verification for local jobs */
+ private static long FINAL_SUM;
+
+ public static double getFinalMax() {
+ return FINAL_MAX;
+ }
+
+ public static double getFinalMin() {
+ return FINAL_MIN;
+ }
+
+ public static long getFinalSum() {
+ return FINAL_SUM;
+ }
+
+ @Override
+ public void preApplication()
+ throws InstantiationException, IllegalAccessException {
+ }
+
+ @Override
+ public void postApplication() {
+ FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get();
+ FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get();
+ FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get();
+
+ LOG.info("aggregatedNumVertices=" + FINAL_SUM);
+ LOG.info("aggregatedMaxPageRank=" + FINAL_MAX);
+ LOG.info("aggregatedMinPageRank=" + FINAL_MIN);
+ }
+
+ @Override
+ public void preSuperstep() {
+ if (getSuperstep() >= 3) {
+ LOG.info("aggregatedNumVertices=" +
+ getAggregatedValue(SUM_AGG) +
+ " NumVertices=" + getTotalNumVertices());
+ if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() !=
+ getTotalNumVertices()) {
+ throw new RuntimeException("wrong value of SumAggreg: " +
+ getAggregatedValue(SUM_AGG) + ", should be: " +
+ getTotalNumVertices());
+ }
+ VertexValue maxPagerank = getAggregatedValue(MAX_AGG);
+ LOG.info("aggregatedMaxPageRank=" + maxPagerank.get());
+ VertexValue minPagerank = getAggregatedValue(MIN_AGG);
+ LOG.info("aggregatedMinPageRank=" + minPagerank.get());
+ }
+ }
+
+ @Override
+ public void postSuperstep() { }
+ }
+
+ /**
+ * Master compute associated with {@link PageRankWithKryoSimpleWritable}.
+ * It registers required aggregators.
+ */
+ public static class PageRankWithKryoMasterCompute extends
+ DefaultMasterCompute {
+ @Override
+ public void initialize() throws InstantiationException,
+ IllegalAccessException {
+ registerAggregator(SUM_AGG, LongSumAggregator.class);
+ registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class);
+ registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class);
+ }
+ }
+
+ /**
+ * Simple VertexReader that supports {@link PageRankWithKryoSimpleWritable}
+ */
+ public static class PageRankWithKryoVertexReader extends
+ GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> {
+ /** Class logger */
+ private static final Logger LOG =
+ Logger.getLogger(
+ PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class);
+
+ @Override
+ public boolean nextVertex() {
+ return totalRecords > recordsRead;
+ }
+
+ @Override
+ public Vertex<LongWritable, VertexValue, EdgeValue>
+ getCurrentVertex() throws IOException {
+ Vertex<LongWritable, VertexValue, EdgeValue> vertex =
+ getConf().createVertex();
+ LongWritable vertexId = new LongWritable(
+ (inputSplit.getSplitIndex() * totalRecords) + recordsRead);
+ VertexValue vertexValue = new VertexValue(vertexId.get() * 10d);
+ long targetVertexId =
+ (vertexId.get() + 1) %
+ (inputSplit.getNumSplits() * totalRecords);
+ float edgeValue = vertexId.get() * 100f;
+ List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList();
+ edges.add(EdgeFactory.create(new LongWritable(targetVertexId),
+ new EdgeValue(edgeValue)));
+ vertex.initialize(vertexId, vertexValue, edges);
+ ++recordsRead;
+ if (LOG.isInfoEnabled()) {
+ LOG.info("next: Return vertexId=" + vertex.getId().get() +
+ ", vertexValue=" + vertex.getValue() +
+ ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue);
+ }
+ return vertex;
+ }
+ }
+
+ /**
+ * VertexInputFormat that supports {@link PageRankWithKryoSimpleWritable}
+ */
+ public static class PageRankWithKryoVertexInputFormat extends
+ GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> {
+ @Override
+ public VertexReader<LongWritable, VertexValue,
+ EdgeValue> createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new PageRankWithKryoVertexReader();
+ }
+ }
+
+ /**
+ * Creating a custom vertex value class to force kryo to
+ * register with a new ID. Please note that a custom
+ * class containing a double array is not
+ * necessary for the page rank application. It is only
+ * used for testing the scenario of kryo encountering an
+ * unregistered custom class.
+ */
+ public static class VertexValue extends KryoSimpleWritable {
+ /** Storing the value in an array.
+ Double array is an unregistered type
+ hence kryo will assign a unique class id */
+ private double[] ranks;
+
+ /** Constructor */
+ public VertexValue() {
+ }
+
+ /**
+ * Constructor
+ * @param val Vertex value
+ */
+ public VertexValue(Double val) {
+ ranks = new double[1];
+
+ ranks[0] = val;
+ }
+
+ /**
+ * Get vertex value
+ * @return Vertex value
+ */
+ public Double get() {
+ return ranks[0];
+ }
+
+ /**
+ * Set vertex value.
+ * @param val Vertex value
+ */
+ public void set(Double val) {
+ this.ranks[0] = val;
+ }
+ }
+
+ /**
+ * Creating a custom edge value class to force kryo to
+ * register with a new ID. Please note that a custom
+ * class containing a float is not
+ * necessary for the page rank application. It is only
+ * used for testing the scenario of kryo encountering an
+ * unregistered custom class.
+ */
+ public static class EdgeValue extends KryoSimpleWritable {
+ /** Edge value */
+ private Float realValue;
+
+ /** Constructor */
+ public EdgeValue() {
+ }
+ /**
+ * Constructor
+ * @param val Edge value
+ */
+ public EdgeValue(Float val) {
+ realValue = val;
+ }
+
+ /**
+ * Get edge value
+ * @return Edge value
+ */
+ public Float get() {
+ return realValue;
+ }
+
+ /**
+ * Set edge value
+ * @param val Edge value
+ */
+ public void set(Float val) {
+ this.realValue = val;
+ }
+ }
+
+ /**
+ * Creating a custom message value class to force kryo to
+ * register with a new ID. Please note that a custom
+ * class containing a double list is not
+ * necessary for the page rank application. It is only
+ * used for testing the scenario of kryo encountering an
+ * unregistered custom class.
+ */
+ public static class MessageValue extends KryoSimpleWritable {
+ /** Storing the message in a list to test the list type */
+ private List<Double> msgValue;
+
+ /** Constructor */
+ public MessageValue() {
+ }
+
+ /**
+ * Constructor
+ * @param val Message value
+ */
+ public MessageValue(Double val) {
+ msgValue = new ArrayList<>();
+ msgValue.add(val);
+ }
+
+ /**
+ * Get message value
+ * @return Message value
+ */
+ public Double get() {
+ return msgValue.get(0);
+ }
+
+ /**
+ * Set message value
+ * @param val Message value
+ */
+ public void set(Double val) {
+ this.msgValue.set(0, val);
+ }
+ }
+
+
+ /**
+ * Aggregator for getting max double value
+ */
+ public static class DoubleMaxWrapperAggregator extends
+ BasicAggregator<VertexValue> {
+ @Override
+ public void aggregate(VertexValue value) {
+ getAggregatedValue().set(
+ Math.max(getAggregatedValue().get(), value.get()));
+ }
+
+ @Override
+ public VertexValue createInitialValue() {
+ return new VertexValue(Double.NEGATIVE_INFINITY);
+ }
+ }
+
+ /**
+ * Aggregator for getting min double value.
+ */
+ public static class DoubleMinWrapperAggregator
+ extends BasicAggregator<VertexValue> {
+ @Override
+ public void aggregate(VertexValue value) {
+ getAggregatedValue().set(
+ Math.min(getAggregatedValue().get(), value.get()));
+ }
+
+ @Override
+ public VertexValue createInitialValue() {
+ return new VertexValue(Double.MAX_VALUE);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java
new file mode 100644
index 0000000..69f5a83
--- /dev/null
+++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.examples;
+
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.job.GiraphJob;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test page rank with kryo wrapper
+ */
+public class TestKryoPageRank extends BspCase {
+
+ /**
+ * Constructor
+ */
+ public TestKryoPageRank() {
+ super(TestPageRank.class.getName());
+ }
+
+ @Test
+ public void testKryoPageRank()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ testPageRankWithKryoWrapper(1);
+ }
+
+ @Test
+ public void testKryoPageRankTenThreadsCompute()
+ throws ClassNotFoundException, IOException, InterruptedException {
+ testPageRankWithKryoWrapper(10);
+ }
+
+
+ /**
+ * Testing simple page rank by wrapping vertex value, edge
+ * and message values with kryo wrapper.
+ *
+ * @param numComputeThreads Number of compute threads to use
+ * @throws java.io.IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ private void testPageRankWithKryoWrapper(int numComputeThreads)
+ throws IOException, InterruptedException, ClassNotFoundException {
+ GiraphConfiguration conf = new GiraphConfiguration();
+ conf.setComputationClass(PageRankWithKryoSimpleWritable.class);
+ conf.setVertexInputFormatClass(
+ PageRankWithKryoSimpleWritable.PageRankWithKryoVertexInputFormat.class);
+ conf.setWorkerContextClass(
+ PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.class);
+ conf.setMasterComputeClass(
+ PageRankWithKryoSimpleWritable.PageRankWithKryoMasterCompute.class);
+ conf.setNumComputeThreads(numComputeThreads);
+ // Set enough partitions to generate randomness on the compute side
+ if (numComputeThreads != 1) {
+ GiraphConstants.USER_PARTITION_COUNT.set(conf, numComputeThreads * 5);
+ }
+ GiraphJob job = prepareJob(getCallingMethodName(), conf);
+ assertTrue(job.run(true));
+ if (!runningInDistributedMode()) {
+ double maxPageRank =
+ PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMax();
+ double minPageRank =
+ PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMin();
+ long numVertices =
+ PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalSum();
+ System.out.println(getCallingMethodName() + ": maxPageRank=" +
+ maxPageRank + " minPageRank=" +
+ minPageRank + " numVertices=" + numVertices + ", " +
+ " numComputeThreads=" + numComputeThreads);
+ assertEquals(34.03, maxPageRank, 0.001);
+ assertEquals(0.03, minPageRank, 0.00001);
+ assertEquals(5L, numVertices);
+ }
+ }
+}