You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/06/20 20:24:12 UTC

[1/2] GIRAPH-683: Jython for Computation (nitay)

Updated Branches:
  refs/heads/trunk 1eaddd183 -> 8f89bd85a


http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index de17157..486c6db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.job;
 
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.OutEdges;
@@ -30,17 +29,14 @@ import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import java.lang.reflect.Type;
-import java.util.List;
-
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
 
 /**
  * GiraphConfigurationValidator attempts to verify the consistency of
@@ -69,26 +65,15 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   private static final int VALUE_PARAM_INDEX = 1;
   /** E param vertex index in classList */
   private static final int EDGE_PARAM_INDEX = 2;
-  /** M2 param vertex index in classList */
-  private static final int OUTGOING_MSG_PARAM_INDEX = 4;
   /** M param vertex combiner index in classList */
   private static final int MSG_COMBINER_PARAM_INDEX = 1;
   /** E param edge input format index in classList */
   private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
   /** E param vertex edges index in classList */
-  private static final int EDGE_PARAM_VERTEX_EDGES_INDEX = 1;
+  private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
   /** V param vertex value factory index in classList */
   private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
 
-  /** Vertex Index Type */
-  private Type vertexIndexType;
-  /** Vertex Value Type */
-  private Type vertexValueType;
-  /** Edge Value Type */
-  private Type edgeValueType;
-  /** Outgoing Message Type */
-  private Type outgoingMessageValueType;
-
   /**
    * The Configuration object for use in the validation test.
    */
@@ -105,6 +90,42 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
   }
 
   /**
+   * Get vertex id type
+   *
+   * @return vertex id type
+   */
+  private Class<? extends WritableComparable> vertexIndexType() {
+    return conf.getGiraphTypes().getVertexIdClass();
+  }
+
+  /**
+   * Get vertex value type
+   *
+   * @return vertex value type
+   */
+  private Class<? extends Writable> vertexValueType() {
+    return conf.getGiraphTypes().getVertexValueClass();
+  }
+
+  /**
+   * Get edge value type
+   *
+   * @return edge value type
+   */
+  private Class<? extends Writable> edgeValueType() {
+    return conf.getGiraphTypes().getEdgeValueClass();
+  }
+
+  /**
+   * Get outgoing message value type
+   *
+   * @return outgoing message value type
+   */
+  private Class<? extends Writable> outgoingMessageValueType() {
+    return conf.getGiraphTypes().getOutgoingMessageValueClass();
+  }
+
+  /**
    * Make sure that all registered classes have matching types.  This
    * is a little tricky due to type erasure, cannot simply get them from
    * the class type arguments.  Also, set the vertex index, vertex value,
@@ -112,14 +133,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    */
   public void validateConfiguration() {
     checkConfiguration();
-    Class<? extends Computation<I, V, E, M1, M2>> computationClass =
-      conf.getComputationClass();
-    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
-      Computation.class, computationClass);
-    vertexIndexType = classList.get(ID_PARAM_INDEX);
-    vertexValueType = classList.get(VALUE_PARAM_INDEX);
-    edgeValueType = classList.get(EDGE_PARAM_INDEX);
-    outgoingMessageValueType = classList.get(OUTGOING_MSG_PARAM_INDEX);
     verifyOutEdgesGenericTypes();
     verifyVertexInputFormatGenericTypes();
     verifyEdgeInputFormatGenericTypes();
@@ -148,10 +161,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
       throw new IllegalArgumentException("checkConfiguration: No valid " +
           GiraphConstants.MIN_WORKERS);
     }
-    if (conf.getComputationClass() == null) {
-      throw new IllegalArgumentException("checkConfiguration: Null " +
-          GiraphConstants.COMPUTATION_CLASS.getKey());
-    }
+    conf.createComputationFactory().checkConfiguration(conf);
     if (conf.getVertexInputFormatClass() == null &&
         conf.getEdgeInputFormatClass() == null) {
       throw new IllegalArgumentException("checkConfiguration: One of " +
@@ -183,28 +193,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
    */
   private void verifyOutEdgesGenericTypesClass(
       Class<? extends OutEdges<I, E>> outEdgesClass) {
-    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
-        OutEdges.class, outEdgesClass);
-    // OutEdges implementations can be generic, in which case there are no
-    // types to check.
-    if (classList.isEmpty()) {
-      return;
-    }
-    if (classList.get(ID_PARAM_INDEX) != null &&
-        !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-          "checkClassTypes: Vertex index types don't match, " +
-              "vertex - " + vertexIndexType +
-              ", vertex edges - " + classList.get(ID_PARAM_INDEX));
-    }
-    if (classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX) != null &&
-        !edgeValueType.equals(classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX))) {
-      throw new IllegalArgumentException(
-          "checkClassTypes: Edge value types don't match, " +
-              "vertex - " + edgeValueType +
-              ", vertex edges - " +
-              classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX));
-    }
+    Class<?>[] classList = getTypeArguments(OutEdges.class, outEdgesClass);
+    checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+        OutEdges.class, "vertex index");
+    checkAssignable(classList, EDGE_PARAM_OUT_EDGES_INDEX, edgeValueType(),
+        OutEdges.class, "edge value");
   }
 
   /** Verify matching generic types in OutEdges. */
@@ -222,33 +215,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
       conf.getVertexInputFormatClass();
     if (vertexInputFormatClass != null) {
-      List<Class<?>> classList =
-          ReflectionUtils.getTypeArguments(
-              VertexInputFormat.class, vertexInputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex index types don't match, " +
-                "vertex - " + vertexIndexType +
-                ", vertex input format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(VALUE_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex value type is not known");
-      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex value types don't match, " +
-                "vertex - " + vertexValueType +
-                ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_INDEX) == null) {
-        LOG.warn("Input format edge value type is not known");
-      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
-      }
+      Class<?>[] classList =
+          getTypeArguments(VertexInputFormat.class, vertexInputFormatClass);
+      checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+          VertexInputFormat.class, "vertex index");
+      checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
+          VertexInputFormat.class, "vertex value");
+      checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
+          VertexInputFormat.class, "edge value");
     }
   }
 
@@ -257,27 +231,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
         conf.getEdgeInputFormatClass();
     if (edgeInputFormatClass != null) {
-      List<Class<?>> classList =
-          ReflectionUtils.getTypeArguments(
-              EdgeInputFormat.class, edgeInputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Input format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Vertex index types don't match, " +
-                "vertex - " + vertexIndexType +
-                ", edge input format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
-        LOG.warn("Input format edge value type is not known");
-      } else if (!edgeValueType.equals(
-          classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
-        throw new IllegalArgumentException(
-            "checkClassTypes: Edge value types don't match, " +
-                "vertex - " + edgeValueType +
-                ", edge input format - " +
-                classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
-      }
+      Class<?>[] classList =
+          getTypeArguments(EdgeInputFormat.class, edgeInputFormatClass);
+      checkAssignable(classList, ID_PARAM_INDEX,
+          vertexIndexType(), EdgeInputFormat.class, "vertex index");
+      checkAssignable(classList, EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX,
+          edgeValueType(), EdgeInputFormat.class, "edge value");
     }
   }
 
@@ -286,22 +245,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     Class<? extends Combiner<I, M2>> vertexCombinerClass =
       conf.getCombinerClass();
     if (vertexCombinerClass != null) {
-      List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(
-          Combiner.class, vertexCombinerClass);
-      if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex index types don't match, " +
-            "vertex - " + vertexIndexType +
-            ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (!outgoingMessageValueType.equals(
-          classList.get(MSG_COMBINER_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Message value types don't match, " +
-            "vertex - " + outgoingMessageValueType +
-            ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
-      }
+      Class<?>[] classList =
+          getTypeArguments(Combiner.class, vertexCombinerClass);
+      checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+          Combiner.class, "vertex index");
+      checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
+          outgoingMessageValueType(), Combiner.class, "message value");
     }
   }
 
@@ -310,33 +259,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     Class<? extends VertexOutputFormat<I, V, E>>
       vertexOutputFormatClass = conf.getVertexOutputFormatClass();
     if (vertexOutputFormatClass != null) {
-      List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(
-          VertexOutputFormat.class, vertexOutputFormatClass);
-      if (classList.get(ID_PARAM_INDEX) == null) {
-        LOG.warn("Output format vertex index type is not known");
-      } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex index types don't match, " +
-            "vertex - " + vertexIndexType +
-            ", vertex output format - " + classList.get(ID_PARAM_INDEX));
-      }
-      if (classList.get(VALUE_PARAM_INDEX) == null) {
-        LOG.warn("Output format vertex value type is not known");
-      } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Vertex value types don't match, " +
-            "vertex - " + vertexValueType +
-            ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
-      }
-      if (classList.get(EDGE_PARAM_INDEX) == null) {
-        LOG.warn("Output format edge value type is not known");
-      } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
-        throw new IllegalArgumentException(
-          "checkClassTypes: Edge value types don't match, " +
-            "vertex - " + edgeValueType +
-            ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
-      }
+      Class<?>[] classList =
+        getTypeArguments(VertexOutputFormat.class, vertexOutputFormatClass);
+      checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+          VertexOutputFormat.class, "vertex index");
+      checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
+          VertexOutputFormat.class, "vertex value");
+      checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
+          VertexOutputFormat.class, "edge value");
     }
   }
 
@@ -347,53 +277,72 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
     if (DefaultVertexValueFactory.class.equals(vvfClass)) {
       return;
     }
-    List<Class<?>> classList = ReflectionUtils.getTypeArguments(
-        VertexValueFactory.class, vvfClass);
-    if (classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX) != null &&
-        !vertexValueType.equals(
-            classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX))) {
-      throw new IllegalArgumentException(
-          "checkClassTypes: Vertex value types don't match, " +
-              "vertex - " + vertexValueType +
-              ", vertex value factory - " +
-              classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX));
-    }
+    Class<?>[] classList = getTypeArguments(VertexValueFactory.class, vvfClass);
+    checkEquals(classList, VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX,
+        vertexValueType(), VertexValueFactory.class, "vertex value");
   }
 
-  /** If there is a vertex resolver,
-   * validate the generic parameter types. */
+  /**
+   * If there is a vertex resolver,
+   * validate the generic parameter types.
+   * */
   private void verifyVertexResolverGenericTypes() {
     Class<? extends VertexResolver<I, V, E>>
-      vrClass = conf.getVertexResolverClass();
-    if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
+        vrClass = conf.getVertexResolverClass();
+    if (DefaultVertexResolver.class.equals(vrClass)) {
       return;
     }
-    Class<? extends DefaultVertexResolver<I, V, E>>
-      dvrClass =
-        (Class<? extends DefaultVertexResolver<I, V, E>>) vrClass;
-    List<Class<?>> classList =
-      ReflectionUtils.getTypeArguments(
-          DefaultVertexResolver.class, dvrClass);
-    if (classList.get(ID_PARAM_INDEX) != null &&
-      !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
-      throw new IllegalArgumentException(
-        "checkClassTypes: Vertex index types don't match, " +
-          "vertex - " + vertexIndexType +
-          ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
-    }
-    if (classList.get(VALUE_PARAM_INDEX) != null &&
-      !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+    Class<?>[] classList =
+        getTypeArguments(VertexResolver.class, vrClass);
+    checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+        VertexResolver.class, "vertex index");
+    checkEquals(classList, VALUE_PARAM_INDEX, vertexValueType(),
+        VertexResolver.class, "vertex value");
+    checkEquals(classList, EDGE_PARAM_INDEX, edgeValueType(),
+        VertexResolver.class, "edge value");
+  }
+
+  /**
+   * Check that the type from computation equals the type from the class.
+   *
+   * @param classList classes from type
+   * @param index array index of class to check
+   * @param classFromComputation class from computation
+   * @param klass Class type we're checking, only used for printing name
+   * @param typeName Name of type we're checking
+   */
+  private static void checkEquals(Class<?>[] classList, int index,
+      Class<?> classFromComputation, Class klass, String typeName) {
+    if (classList[index] == null) {
+      LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
+    } else if (!classList[index].equals(classFromComputation)) {
       throw new IllegalArgumentException(
-        "checkClassTypes: Vertex value types don't match, " +
-          "vertex - " + vertexValueType +
-          ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
+          "checkClassTypes: " + typeName + " types not equal, " +
+              "computation - " + classFromComputation +
+              ", " + klass.getSimpleName() + " - " +
+              classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
     }
-    if (classList.get(EDGE_PARAM_INDEX) != null &&
-      !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+  }
+
+  /**
+   * Check that the type from computation is assignable to type from the class.
+   *
+   * @param classList classes from type
+   * @param index array index of class to check
+   * @param classFromComputation class from computation
+   * @param klass Class type we're checking, only used for printing name
+   * @param typeName Name of type we're checking
+   */
+  private static void checkAssignable(Class<?>[] classList, int index,
+      Class<?> classFromComputation, Class klass, String typeName) {
+    if (classList[index] == null) {
+      LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
+    } else if (!classList[index].isAssignableFrom(classFromComputation)) {
       throw new IllegalArgumentException(
-        "checkClassTypes: Edge value types don't match, " +
-          "vertex - " + edgeValueType +
-          ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
+          "checkClassTypes: " + typeName + " types not assignable, " +
+              "computation - " + classFromComputation +
+              ", " + klass.getSimpleName() + " - " +
+              classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
new file mode 100644
index 0000000..d916119
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+/**
+ * Type of deployment for a file
+ */
+public enum DeployType {
+  /** Resource packaged with jar */
+  RESOURCE,
+  /** Hadoop's Distributed Cache */
+  DISTRIBUTED_CACHE
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
new file mode 100644
index 0000000..6db5f65
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.EnumConfOption;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.python.core.PyObject;
+import org.python.util.PythonInterpreter;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
+
+/**
+ * Factory for creating Jython Computation from python scripts
+ */
+public class JythonComputationFactory implements ComputationFactory {
+  /** Type of script path */
+  public static final EnumConfOption<DeployType> JYTHON_DEPLOY_TYPE =
+      EnumConfOption.create("giraph.jython.deploy.type",
+          DeployType.class, DeployType.DISTRIBUTED_CACHE);
+  /** Path to Jython script */
+  public static final StrConfOption JYTHON_SCRIPT_PATH =
+      new StrConfOption("giraph.jython.path", "_script_not_set_");
+  /** Name of Computation class in Jython script */
+  public static final StrConfOption JYTHON_COMPUTATION_CLASS =
+      new StrConfOption("giraph.jython.class", "_computation_class_not_set_");
+
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(JythonUtils.class);
+
+  @Override
+  public void initComputation(ImmutableClassesGiraphConfiguration conf) {
+    String scriptPath = JYTHON_SCRIPT_PATH.get(conf);
+    InputStream pythonStream = getPythonScriptStream(conf, scriptPath);
+    try {
+      PythonInterpreter interpreter = new PythonInterpreter();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("initComputation: Jython loading script " + scriptPath);
+      }
+      interpreter.execfile(pythonStream);
+
+      String className = computationName(conf);
+      PyObject pyComputationModule = interpreter.get(className);
+
+      JythonUtils.setPythonComputationModule(pyComputationModule);
+    } finally {
+      Closeables.closeQuietly(pythonStream);
+    }
+  }
+
+  /**
+   * Get an {@link InputStream} for the jython script.
+   *
+   * @param conf Configuration
+   * @param path script path
+   * @return {@link InputStream} for reading script
+   */
+  private InputStream getPythonScriptStream(Configuration conf,
+      String path) {
+    InputStream stream = null;
+    DeployType deployType = JYTHON_DEPLOY_TYPE.get(conf);
+    switch (deployType) {
+    case RESOURCE:
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getPythonScriptStream: Reading Jython Computation " +
+            "from resource at " + path);
+      }
+      stream = getClass().getResourceAsStream(path);
+      if (stream == null) {
+        throw new IllegalStateException("getPythonScriptStream: Failed to " +
+            "open Jython script from resource at " + path);
+      }
+      break;
+    case DISTRIBUTED_CACHE:
+      if (LOG.isInfoEnabled()) {
+        LOG.info("getPythonScriptStream: Reading Jython Computation " +
+            "from DistributedCache at " + path);
+      }
+      Optional<Path> localPath = getLocalCacheFile(conf, path);
+      if (!localPath.isPresent()) {
+        throw new IllegalStateException("getPythonScriptStream: Failed to " +
+            "find Jython script in local DistributedCache matching " + path);
+      }
+      String pathStr = localPath.get().toString();
+      try {
+        stream = new BufferedInputStream(new FileInputStream(pathStr));
+      } catch (IOException e) {
+        throw new IllegalStateException("getPythonScriptStream: Failed open " +
+            "Jython script from DistributedCache at " + localPath);
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("getPythonScriptStream: Unknown " +
+          "Jython script deployment type: " + deployType);
+    }
+    return stream;
+  }
+
+  @Override
+  public Computation getComputation(ImmutableClassesGiraphConfiguration conf) {
+    PyObject pyComputationModule = JythonUtils.getPythonComputationModule();
+    Preconditions.checkNotNull(pyComputationModule);
+
+    PyObject pyComputationObj = pyComputationModule.__call__();
+    Object computationObj = pyComputationObj.__tojava__(Computation.class);
+    if (!(computationObj instanceof Computation)) {
+      throw new IllegalStateException("getComputation: Jython object " +
+          computationName(conf) + " is not a Computation");
+    }
+
+    conf.configureIfPossible(computationObj);
+    return (Computation) computationObj;
+  }
+
+  @Override
+  public void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+    if (JYTHON_SCRIPT_PATH.isDefaultValue(conf)) {
+      throw new IllegalStateException("checkConfiguration: " +
+          JYTHON_SCRIPT_PATH.getKey() + " not set in configuration");
+    }
+    if (JYTHON_COMPUTATION_CLASS.isDefaultValue(conf)) {
+      throw new IllegalStateException("checkConfiguration: " +
+          JYTHON_COMPUTATION_CLASS.getKey() + " not set in configuration");
+    }
+  }
+
+  @Override
+  public String computationName(GiraphConfiguration conf) {
+    return JYTHON_COMPUTATION_CLASS.get(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
new file mode 100644
index 0000000..f456aa8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.graph.Language;
+import org.python.core.PyObject;
+
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_FACTORY_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_SCRIPT_PATH;
+
+/**
+ * Helpers for running jobs with Jython.
+ */
+public class JythonUtils {
+  /** The Jython compute function, cached here for fast access */
+  private static volatile PyObject JYTHON_COMPUTATION_MODULE;
+
+  /** Don't construct */
+  private JythonUtils() { }
+
+  /**
+   * Set static python computation module stored
+   *
+   * @param mod python computation module
+   */
+  public static void setPythonComputationModule(PyObject mod) {
+    JYTHON_COMPUTATION_MODULE = mod;
+  }
+
+  /**
+   * Get python computation module stored
+   *
+   * @return python computation module
+   */
+  public static PyObject getPythonComputationModule() {
+    return JYTHON_COMPUTATION_MODULE;
+  }
+
+  /**
+   * Sets up the Configuration for using Jython
+   *
+   * @param conf Configuration to se
+   * @param scriptPath Path to Jython script (resource or distributed cache)
+   * @param klassName Class name of Jython Computation
+   * @param types GiraphTypes
+   */
+  public static void init(GiraphConfiguration conf, String scriptPath,
+      String klassName, GiraphTypes types) {
+    types.writeIfUnset(conf);
+    COMPUTATION_LANGUAGE.set(conf, Language.JYTHON);
+    COMPUTATION_FACTORY_CLASS.set(conf, JythonComputationFactory.class);
+    JYTHON_SCRIPT_PATH.set(conf, scriptPath);
+    JYTHON_COMPUTATION_CLASS.set(conf, klassName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java b/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
new file mode 100644
index 0000000..4188ddb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Jython integration.
+ */
+package org.apache.giraph.jython;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index a12ef58..13bb492 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -20,7 +20,9 @@ package org.apache.giraph.master;
 
 import org.apache.giraph.combiner.Combiner;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.utils.WritableUtils;
 import org.apache.hadoop.io.Writable;
@@ -29,7 +31,8 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Modifier;
-import java.util.List;
+
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
 
 /**
  * Holds Computation and Combiner class.
@@ -92,17 +95,25 @@ public class SuperstepClasses implements Writable {
    * @param conf Configuration to verify this with
    */
   public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf) {
-    List<Class<?>> computationTypes = ReflectionUtils.getTypeArguments(
-        Computation.class, computationClass);
-    verifyTypes(conf.getVertexIdClass(), computationTypes.get(0),
+    // In some cases, for example when using Jython, the Computation class may
+    // not be set. This is because it is created by a ComputationFactory
+    // dynamically and not known ahead of time. In this case there is nothing to
+    // verify here so we bail.
+    if (COMPUTATION_LANGUAGE.get(conf) == Language.JYTHON) {
+      return;
+    }
+
+    Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
+        TypesHolder.class, computationClass);
+    verifyTypes(conf.getVertexIdClass(), computationTypes[0],
         "Vertex id", computationClass);
-    verifyTypes(conf.getVertexValueClass(), computationTypes.get(1),
+    verifyTypes(conf.getVertexValueClass(), computationTypes[1],
         "Vertex value", computationClass);
-    verifyTypes(conf.getEdgeValueClass(), computationTypes.get(2),
+    verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
         "Edge value", computationClass);
-    verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes.get(3),
+    verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3],
         "Previous outgoing and new incoming message", computationClass);
-    Class<?> outgoingMessageType = computationTypes.get(4);
+    Class<?> outgoingMessageType = computationTypes[4];
     if (outgoingMessageType.isInterface()) {
       throw new IllegalStateException("verifyTypesMatch: " +
           "Message type must be concrete class " + outgoingMessageType);
@@ -112,11 +123,11 @@ public class SuperstepClasses implements Writable {
           "Message type can't be abstract class" + outgoingMessageType);
     }
     if (combinerClass != null) {
-      List<Class<?>> combinerTypes = ReflectionUtils.getTypeArguments(
+      Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
           Combiner.class, combinerClass);
-      verifyTypes(conf.getVertexIdClass(), combinerTypes.get(0),
+      verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
           "Vertex id", combinerClass);
-      verifyTypes(outgoingMessageType, combinerTypes.get(1),
+      verifyTypes(outgoingMessageType, combinerTypes[1],
           "Outgoing message", combinerClass);
     }
   }
@@ -154,7 +165,9 @@ public class SuperstepClasses implements Writable {
 
   @Override
   public String toString() {
-    return "(computation=" + computationClass.getName() + ",combiner=" +
+    String computationName = computationClass == null ? "_not_set_" :
+        computationClass.getName();
+    return "(computation=" + computationName + ",combiner=" +
         ((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index d8b121b..f0a7e4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,8 +17,6 @@
  */
 package org.apache.giraph.utils;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
 import org.apache.commons.cli.BasicParser;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
@@ -28,30 +26,39 @@ import org.apache.commons.cli.ParseException;
 import org.apache.giraph.Algorithm;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.giraph.job.GiraphConfigurationValidator;
+import org.apache.giraph.jython.JythonUtils;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.partition.Partition;
-import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.ZooKeeper;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+
 /**
  * Translate command line args into Configuration Key-Value pairs.
  */
@@ -100,6 +107,10 @@ public final class ConfigurationUtils {
     OPTIONS.addOption("pc", "partitionClass", true, "Partition class");
     OPTIONS.addOption("vvf", "vertexValueFactoryClass", true,
         "Vertex value factory class");
+    OPTIONS.addOption("th", "typesHolder", true,
+        "Class that holds types. Needed only if Computation is not set");
+    OPTIONS.addOption("jyc", "jythonClass", true,
+        "Jython class name, used if computation passed in is a python script");
     OPTIONS.addOption("ca", "customArguments", true, "provide custom" +
         " arguments for the job configuration in the form:" +
         " -ca <param1>=<value1>,<param2>=<value2> -ca <param3>=<value3> etc." +
@@ -113,6 +124,29 @@ public final class ConfigurationUtils {
   private ConfigurationUtils() { }
 
   /**
+   * Get a class which is parameterized by the graph types defined by user.
+   * The types holder is actually an interface that any class which holds all of
+   * Giraph types can implement. It is used with reflection to infer the Giraph
+   * types.
+   *
+   * The current order of type holders we try are:
+   * 1) The {@link TypesHolder} class directly.
+   * 2) The {@link Computation} class, as that holds all the types.
+   *
+   * @param conf Configuration
+   * @return {@link TypesHolder} or null if could not find one.
+   */
+  public static Class<? extends TypesHolder> getTypesHolderClass(
+      Configuration conf) {
+    Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
+    if (klass != null) {
+      return klass;
+    }
+    klass = COMPUTATION_CLASS.get(conf);
+    return klass;
+  }
+
+  /**
    * Translate CLI arguments to GiraphRunner or 'bin/hadoop jar' into
    * Configuration Key-Value pairs.
    * @param giraphConf the current job Configuration.
@@ -199,52 +233,55 @@ public final class ConfigurationUtils {
    * Populate GiraphConfiguration for this job with all cmd line args found.
    * Any global configuration data that Giraph on any platform might need
    * should be captured here.
-   * @param giraphConfiguration config for this job run
-   * @param cmd parsed command line options to store in giraphConfiguration
+   *
+   * @param conf config for this job run
+   * @param cmd parsed command line options to store in conf
    * @param computationClassName the computation class (application) to run in
    *                             this job.
    * @param workers the number of worker tasks for this job run.
    */
   private static void populateGiraphConfiguration(final GiraphConfiguration
-    giraphConfiguration, final CommandLine cmd,
-      final String computationClassName,
-    final int workers) throws ClassNotFoundException, IOException {
-    giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
-    giraphConfiguration.setComputationClass(
-        (Class<? extends Computation>) Class.forName(computationClassName));
+    conf, final CommandLine cmd,
+    final String computationClassName, final int workers)
+    throws ClassNotFoundException, IOException {
+    conf.setWorkerConfiguration(workers, workers, 100.0f);
+    if (cmd.hasOption("typesHolder")) {
+      Class<? extends TypesHolder> typesHolderClass =
+          (Class<? extends TypesHolder>)
+              Class.forName(cmd.getOptionValue("typesHolder"));
+      TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
+    }
     if (cmd.hasOption("c")) {
-      giraphConfiguration.setCombinerClass(
+      conf.setCombinerClass(
           (Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
     }
     if (cmd.hasOption("ve")) {
-      giraphConfiguration.setOutEdgesClass(
-          (Class<? extends OutEdges>)
-              Class.forName(cmd.getOptionValue("ve")));
+      conf.setOutEdgesClass(
+          (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ve")));
     }
     if (cmd.hasOption("ive")) {
-      giraphConfiguration.setInputOutEdgesClass(
-          (Class<? extends OutEdges>)
-              Class.forName(cmd.getOptionValue("ive")));
+      conf.setInputOutEdgesClass(
+          (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ive")));
     }
     if (cmd.hasOption("wc")) {
-      giraphConfiguration.setWorkerContextClass(
-          (Class<? extends WorkerContext>)
-              Class.forName(cmd.getOptionValue("wc")));
+      conf.setWorkerContextClass(
+          (Class<? extends WorkerContext>) Class
+              .forName(cmd.getOptionValue("wc")));
     }
     if (cmd.hasOption("mc")) {
-      giraphConfiguration.setMasterComputeClass(
-          (Class<? extends MasterCompute>)
-              Class.forName(cmd.getOptionValue("mc")));
+      conf.setMasterComputeClass(
+          (Class<? extends MasterCompute>) Class
+              .forName(cmd.getOptionValue("mc")));
     }
     if (cmd.hasOption("aw")) {
-      giraphConfiguration.setAggregatorWriterClass(
-          (Class<? extends AggregatorWriter>)
-              Class.forName(cmd.getOptionValue("aw")));
+      conf.setAggregatorWriterClass(
+          (Class<? extends AggregatorWriter>) Class
+              .forName(cmd.getOptionValue("aw")));
     }
     if (cmd.hasOption("vif")) {
-      giraphConfiguration.setVertexInputFormatClass(
-          (Class<? extends VertexInputFormat>)
-              Class.forName(cmd.getOptionValue("vif")));
+      conf.setVertexInputFormatClass(
+          (Class<? extends VertexInputFormat>) Class
+              .forName(cmd.getOptionValue("vif")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No vertex input format specified. Ensure your " +
@@ -252,9 +289,9 @@ public final class ConfigurationUtils {
       }
     }
     if (cmd.hasOption("eif")) {
-      giraphConfiguration.setEdgeInputFormatClass(
-          (Class<? extends EdgeInputFormat>)
-              Class.forName(cmd.getOptionValue("eif")));
+      conf.setEdgeInputFormatClass(
+          (Class<? extends EdgeInputFormat>) Class
+              .forName(cmd.getOptionValue("eif")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No edge input format specified. Ensure your " +
@@ -262,9 +299,9 @@ public final class ConfigurationUtils {
       }
     }
     if (cmd.hasOption("of")) {
-      giraphConfiguration.setVertexOutputFormatClass(
-          (Class<? extends VertexOutputFormat>)
-              Class.forName(cmd.getOptionValue("of")));
+      conf.setVertexOutputFormatClass(
+          (Class<? extends VertexOutputFormat>) Class
+              .forName(cmd.getOptionValue("of")));
     } else {
       if (LOG.isInfoEnabled()) {
         LOG.info("No output format specified. Ensure your OutputFormat " +
@@ -272,14 +309,13 @@ public final class ConfigurationUtils {
       }
     }
     if (cmd.hasOption("pc")) {
-      giraphConfiguration.setPartitionClass(
-          (Class<? extends Partition>)
-              Class.forName(cmd.getOptionValue("pc")));
+      conf.setPartitionClass(
+          (Class<? extends Partition>) Class.forName(cmd.getOptionValue("pc")));
     }
     if (cmd.hasOption("vvf")) {
-      giraphConfiguration.setVertexValueFactoryClass(
-          (Class<? extends VertexValueFactory>)
-              Class.forName(cmd.getOptionValue("vvf")));
+      conf.setVertexValueFactoryClass(
+          (Class<? extends VertexValueFactory>) Class
+              .forName(cmd.getOptionValue("vvf")));
     }
     if (cmd.hasOption("ca")) {
       for (String caOptionValue : cmd.getOptionValues("ca")) {
@@ -295,14 +331,14 @@ public final class ConfigurationUtils {
             LOG.info("Setting custom argument [" + parts[0] + "] to [" +
                 parts[1] + "] in GiraphConfiguration");
           }
-          giraphConfiguration.set(parts[0], parts[1]);
+          conf.set(parts[0], parts[1]);
         }
       }
     }
     // Now, we parse options that are specific to Hadoop MR Job
     if (cmd.hasOption("vif")) {
       if (cmd.hasOption("vip")) {
-        GiraphFileInputFormat.addVertexInputPath(giraphConfiguration,
+        GiraphFileInputFormat.addVertexInputPath(conf,
           new Path(cmd.getOptionValue("vip")));
       } else {
         if (LOG.isInfoEnabled()) {
@@ -313,7 +349,7 @@ public final class ConfigurationUtils {
     }
     if (cmd.hasOption("eif")) {
       if (cmd.hasOption("eip")) {
-        GiraphFileInputFormat.addEdgeInputPath(giraphConfiguration,
+        GiraphFileInputFormat.addEdgeInputPath(conf,
           new Path(cmd.getOptionValue("eip")));
       } else {
         if (LOG.isInfoEnabled()) {
@@ -324,11 +360,11 @@ public final class ConfigurationUtils {
     }
     // YARN-ONLY OPTIONS
     if (cmd.hasOption("yj")) {
-      giraphConfiguration.setYarnLibJars(cmd.getOptionValue("yj"));
+      conf.setYarnLibJars(cmd.getOptionValue("yj"));
     }
     if (cmd.hasOption("yh")) {
-      giraphConfiguration.setYarnTaskHeapMb(
-        Integer.parseInt(cmd.getOptionValue("yh")));
+      conf.setYarnTaskHeapMb(
+          Integer.parseInt(cmd.getOptionValue("yh")));
     }
     /*if[PURE_YARN]
     if (cmd.hasOption("of")) {
@@ -337,8 +373,8 @@ public final class ConfigurationUtils {
         Path outputDir =
             new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
         outputDir =
-          outputDir.getFileSystem(giraphConfiguration).makeQualified(outputDir);
-        giraphConfiguration.set(
+          outputDir.getFileSystem(conf).makeQualified(outputDir);
+        conf.set(
             org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR,
             outputDir.toString());
 
@@ -351,6 +387,57 @@ public final class ConfigurationUtils {
     }
     end[PURE_YARN]*/
     // END YARN-ONLY OPTIONS
+    handleComputationClass(conf, cmd, computationClassName);
+  }
+
+  /**
+   * Helper to deal with computation class.
+   *
+   * @param conf Configuration
+   * @param cmd CommandLine
+   * @param computationClassName Name of computation
+   * @throws ClassNotFoundException error finding class
+   */
+  private static void handleComputationClass(GiraphConfiguration conf,
+    CommandLine cmd, String computationClassName)
+    throws ClassNotFoundException {
+    if (computationClassName.endsWith("py")) {
+      handleJythonComputation(conf, cmd, computationClassName);
+    } else {
+      conf.setComputationClass(
+          (Class<? extends Computation>) Class.forName(computationClassName));
+    }
+  }
+
+  /**
+   * Helper to handle Computations implemented in Python.
+   *
+   * @param conf Configuration
+   * @param cmd CommandLine
+   * @param scriptPath Path to python script
+   */
+  private static void handleJythonComputation(GiraphConfiguration conf,
+    CommandLine cmd, String scriptPath) {
+    String jythonClass = cmd.getOptionValue("jythonClass");
+    if (jythonClass == null) {
+      throw new IllegalArgumentException(
+          "handleJythonComputation: Need to set Jython Computation class " +
+          "name with --jythonClass");
+    }
+    String typesHolderClass = cmd.getOptionValue("typesHolder");
+    if (typesHolderClass == null) {
+      throw new IllegalArgumentException(
+          "handleJythonComputation: Need to set TypesHolder class name " +
+          "with --typesHolder");
+    }
+
+    Path path = new Path(scriptPath);
+    Path remotePath = DistributedCacheUtils.copyToHdfs(path, conf);
+
+    DistributedCache.addCacheFile(remotePath.toUri(), conf);
+
+    GiraphTypes types = GiraphTypes.readFrom(conf);
+    JythonUtils.init(conf, scriptPath, jythonClass, types);
   }
 
   /**
@@ -372,7 +459,7 @@ public final class ConfigurationUtils {
       Algorithm.class, "org.apache.giraph");
     System.out.print("  Supported algorithms:\n");
     for (Class<?> clazz : classes) {
-      if (Vertex.class.isAssignableFrom(clazz)) {
+      if (Computation.class.isAssignableFrom(clazz)) {
         Algorithm algorithm = clazz.getAnnotation(Algorithm.class);
         StringBuilder sb = new StringBuilder();
         sb.append(algorithm.name()).append(" - ").append(clazz.getName())

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
new file mode 100644
index 0000000..6abe89b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Optional;
+
+import java.io.IOException;
+
+import static org.apache.commons.io.FilenameUtils.getBaseName;
+
+/**
+ * Helpers for dealing with {@link org.apache.hadoop.filecache.DistributedCache}
+ */
+public class DistributedCacheUtils {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(
+      DistributedCacheUtils.class);
+
+  /** Don't construct */
+  private DistributedCacheUtils() { }
+
+  /**
+   * Get local path to file from a DistributedCache.
+   *
+   * @param conf Configuration
+   * @param pathToMatch Path that was used to insert into DistributedCache
+   * @return Path matched, or Optional.absent()
+   */
+  public static Optional<Path>
+  getLocalCacheFile(Configuration conf, String pathToMatch) {
+    String nameToPath = FilenameUtils.getName(pathToMatch);
+    Path[] paths;
+    try {
+      paths = DistributedCache.getLocalCacheFiles(conf);
+    } catch (IOException e) {
+      return Optional.absent();
+    }
+    for (Path path : paths) {
+      if (FilenameUtils.getName(path.toString()).equals(nameToPath)) {
+        return Optional.of(path);
+      }
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * Copy a file to HDFS if it is local. If the path is already in HDFS, this
+   * call does nothing.
+   *
+   * @param path path to file
+   * @param conf Configuration
+   * @return path to file on HDFS.
+   */
+  public static Path copyToHdfs(Path path, Configuration conf) {
+    if (path.toString().startsWith("hdfs://")) {
+      // Already on HDFS
+      return path;
+    }
+
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      throw new IllegalArgumentException("Failed to get HDFS FileSystem", e);
+    }
+    String name = getBaseName(path.toString()) + "-" + System.nanoTime();
+    Path remotePath = new Path("/tmp/giraph", name);
+    LOG.info("copyToHdfsIfNecessary: Copying " + path + " to " +
+        remotePath + " on hdfs " + fs.getUri());
+    try {
+      fs.copyFromLocalFile(false, true, path, remotePath);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          "Failed to copy jython script from local path " + path +
+          " to hdfs path " + remotePath + " on hdfs " + fs.getUri(), e);
+    }
+    return remotePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
index 442fc9f..1bc11a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
@@ -45,15 +45,15 @@ public class FileUtils {
   /**
    * Create a temporary folder that will be removed after the test.
    *
-   * @param vertexClass Used for generating the folder name.
+   * @param computationName Used for generating the folder name.
    * @return File object for the directory.
    */
-  public static File createTestDir(Class<?> vertexClass)
+  public static File createTestDir(String computationName)
     throws IOException {
     String systemTmpDir = System.getProperty("java.io.tmpdir");
     long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
     File testTempDir = new File(systemTmpDir, "giraph-" +
-        vertexClass.getSimpleName() + '-' + simpleRandomLong);
+        computationName + '-' + simpleRandomLong);
     if (!testTempDir.mkdir()) {
       throw new IOException("Could not create " + testTempDir);
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index b4920e1..9fe7663 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -101,7 +101,7 @@ public class InternalVertexRunner {
     File tmpDir = null;
     try {
       // Prepare input file, output folder and temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getComputationClass());
+      tmpDir = FileUtils.createTestDir(conf.getComputationName());
 
       File vertexInputFile = null;
       File edgeInputFile = null;
@@ -137,7 +137,7 @@ public class InternalVertexRunner {
       GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
 
       // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
+      GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 
       Job internalJob = job.getInternalJob();
       if (conf.hasVertexInputFormat()) {
@@ -212,7 +212,7 @@ public class InternalVertexRunner {
     File tmpDir = null;
     try {
       // Prepare temporary folders
-      tmpDir = FileUtils.createTestDir(conf.getComputationClass());
+      tmpDir = FileUtils.createTestDir(conf.getComputationName());
 
       File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
       File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
@@ -221,7 +221,7 @@ public class InternalVertexRunner {
       conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
 
       // Create and configure the job to run the vertex
-      GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
+      GiraphJob job = new GiraphJob(conf, conf.getComputationName());
 
       InMemoryVertexInputFormat.setGraph(graph);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index 96352bb..9083769 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -19,17 +19,7 @@
 package org.apache.giraph.utils;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.jodah.typetools.TypeResolver;
 
 /**
  * Helper methods to get type arguments to generic classes.  Courtesy of
@@ -43,29 +33,23 @@ public class ReflectionUtils {
   private ReflectionUtils() { }
 
   /**
-   * Get the underlying class for a type, or null if the type is
-   * a variable type.
+   * Get package path to the object given. Used with resources.
    *
-   * @param type the type
-   * @return the underlying class
+   * @param object the Object to check
+   * @return Path to package of object
    */
-  public static Class<?> getClass(Type type) {
-    if (type instanceof Class) {
-      return (Class<?>) type;
-    } else if (type instanceof ParameterizedType) {
-      return getClass(((ParameterizedType) type).getRawType());
-    } else if (type instanceof GenericArrayType) {
-      Type componentType =
-          ((GenericArrayType) type).getGenericComponentType();
-      Class<?> componentClass = getClass(componentType);
-      if (componentClass != null) {
-        return Array.newInstance(componentClass, 0).getClass();
-      } else {
-        return null;
-      }
-    } else {
-      return null;
-    }
+  public static String getPackagePath(Object object) {
+    return getPackagePath(object.getClass());
+  }
+
+  /**
+   * Get package path to the class given. Used with resources.
+   *
+   * @param klass Class to check
+   * @return Path to package of class
+   */
+  public static String getPackagePath(Class klass) {
+    return "/" + klass.getPackage().getName().replaceAll("\\.", "/");
   }
 
   /**
@@ -77,94 +61,9 @@ public class ReflectionUtils {
    * @param childClass the child class
    * @return a list of the raw classes for the actual type arguments.
    */
-  public static <T> List<Class<?>> getTypeArguments(
+  public static <T> Class<?>[] getTypeArguments(
       Class<T> baseClass, Class<? extends T> childClass) {
-    Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
-    Type type = childClass;
-    // start walking up the inheritance hierarchy until we hit baseClass
-    while (! getClass(type).equals(baseClass)) {
-      if (type instanceof Class) {
-        Type newType = ((Class<?>) type).getGenericSuperclass();
-        if (newType == null) {
-          // we have reached an interface, so we stop here
-          break;
-        } else {
-          // there is no useful information for us in raw types,
-          // so just keep going.
-          type = newType;
-        }
-
-      } else {
-        ParameterizedType parameterizedType = (ParameterizedType) type;
-        Class<?> rawType = (Class<?>) parameterizedType.getRawType();
-
-        Type[] actualTypeArguments =
-            parameterizedType.getActualTypeArguments();
-        TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
-        for (int i = 0; i < actualTypeArguments.length; i++) {
-          resolvedTypes.put(typeParameters[i],
-              actualTypeArguments[i]);
-        }
-
-        if (!rawType.equals(baseClass)) {
-          type = rawType.getGenericSuperclass();
-        }
-      }
-    }
-
-    // finally, for each actual type argument provided to baseClass,
-    // determine (if possible) the raw class for that type argument.
-    Type[] actualTypeArguments;
-    if (type instanceof Class) {
-      actualTypeArguments = ((Class<?>) type).getTypeParameters();
-    } else {
-      actualTypeArguments =
-          ((ParameterizedType) type).getActualTypeArguments();
-    }
-    List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
-    // resolve types by chasing down type variables.
-    for (Type baseType: actualTypeArguments) {
-      while (resolvedTypes.containsKey(baseType)) {
-        baseType = resolvedTypes.get(baseType);
-      }
-      typeArgumentsAsClasses.add(getClass(baseType));
-    }
-    return typeArgumentsAsClasses;
-  }
-
-  /**
-   * Try to directly set a (possibly private) field on an Object.
-   *
-   * @param target Target to set the field on.
-   * @param fieldname Name of field.
-   * @param value Value to set on target.
-   */
-  public static void setField(Object target, String fieldname, Object value)
-    throws NoSuchFieldException, IllegalAccessException {
-    Field field = findDeclaredField(target.getClass(), fieldname);
-    field.setAccessible(true);
-    field.set(target, value);
-  }
-
-  /**
-   * Find a declared field in a class or one of its super classes
-   *
-   * @param inClass Class to search for declared field.
-   * @param fieldname Field name to search for
-   * @return Field or will throw.
-   * @throws NoSuchFieldException When field not found.
-   */
-  private static Field findDeclaredField(Class<?> inClass, String fieldname)
-    throws NoSuchFieldException {
-    while (!Object.class.equals(inClass)) {
-      for (Field field : inClass.getDeclaredFields()) {
-        if (field.getName().equalsIgnoreCase(fieldname)) {
-          return field;
-        }
-      }
-      inClass = inClass.getSuperclass();
-    }
-    throw new NoSuchFieldException();
+    return TypeResolver.resolveArguments(childClass, baseClass);
   }
 
   /**
@@ -199,7 +98,7 @@ public class ReflectionUtils {
   public static <T> T newInstance(
       Class<T> theClass,
       ImmutableClassesGiraphConfiguration configuration) {
-    T result = null;
+    T result;
     try {
       result = theClass.newInstance();
     } catch (InstantiationException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py b/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
new file mode 100644
index 0000000..5ff918c
--- /dev/null
+++ b/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.giraph.graph import BasicComputation
+from org.apache.hadoop.io import FloatWritable
+
+class PageRank(BasicComputation):
+  SUPERSTEP_COUNT = "giraph.pageRank.superstepCount"
+
+  def compute(self, vertex, messages):
+    if self.getSuperstep() >= 1:
+      total = 0
+      for msg in messages:
+        total += msg.get()
+      vertex.getValue().set((0.15 / self.getTotalNumVertices()) + 0.85 * total)
+
+    if self.getSuperstep() < self.getConf().getInt(self.SUPERSTEP_COUNT, 0):
+      self.sendMessageToAllEdges(vertex,
+          FloatWritable(vertex.getValue().get() / vertex.getNumEdges()))
+    else:
+      vertex.voteToHalt()

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index 137cb6e..dd2369a 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.junit.After;
@@ -66,8 +64,8 @@ public class BspCase implements Watcher {
   /** Default path for temporary files */
   static final Path DEFAULT_TEMP_DIR =
       new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
-  
-  public static final String READER_VERTICES =
+
+  public static final String READER_VERTICES_OPT =
 		  		    "GeneratedVertexReader.reader_vertices";
 
   /** A filter for listing parts files */
@@ -109,7 +107,7 @@ public class BspCase implements Watcher {
       conf.setZooKeeperConfiguration(getZooKeeperList());
     }
     // GeneratedInputSplit will generate 5 vertices
-    conf.setLong(READER_VERTICES, 5);
+    conf.setLong(READER_VERTICES_OPT, 5);
 
     // Setup pathes for temporary files
     Path zookeeperDir = getTempPath("_bspZooKeeper");

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
new file mode 100644
index 0000000..245d342
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
+import static org.junit.Assert.assertEquals;
+
+public class TestJython {
+  @Test
+  public void testCountEdges() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    GiraphTypes types = new GiraphTypes(IntWritable.class, IntWritable.class,
+        NullWritable.class, NullWritable.class, NullWritable.class);
+    JythonUtils.init(conf, "count-edges.py", "CountEdges", types);
+    JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(1, (int) values.get(4));
+  }
+
+  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
new file mode 100644
index 0000000..c9b4ace
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.TypesHolder;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+import static org.junit.Assert.assertEquals;
+
+public class TestReflectionUtils {
+  @Test
+  public void testPackagePath() {
+    assertEquals("/org/apache/giraph/utils",
+        ReflectionUtils.getPackagePath(TestReflectionUtils.class));
+    assertEquals("/org/apache/giraph/utils",
+        ReflectionUtils.getPackagePath(getClass()));
+    assertEquals("/org/apache/giraph/utils",
+        ReflectionUtils.getPackagePath(this));
+  }
+
+  private static class IntTypes implements TypesHolder<IntWritable,
+      IntWritable, IntWritable, IntWritable, IntWritable> { }
+
+  private static class IntComputation extends Computation<IntWritable,
+        IntWritable, IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Vertex<IntWritable, IntWritable, IntWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  private static class IntBasicComputation extends BasicComputation<IntWritable,
+      IntWritable, IntWritable, IntWritable> {
+    @Override
+    public void compute(Vertex<IntWritable, IntWritable, IntWritable> vertex,
+        Iterable<IntWritable> messages) throws IOException {
+    }
+  }
+
+  @Test
+  public void testInferWithGenerics() {
+    Class<?>[] classes = getTypeArguments(VertexResolver.class,
+        DefaultVertexResolver.class);
+    assertEquals(3, classes.length);
+    assertEquals(WritableComparable.class, classes[0]);
+    assertEquals(Writable.class, classes[1]);
+    assertEquals(Writable.class, classes[2]);
+    classes = getTypeArguments(VertexValueFactory.class,
+        DefaultVertexValueFactory.class);
+    assertEquals(1, classes.length);
+    assertEquals(Writable.class, classes[0]);
+    classes = getTypeArguments(OutEdges.class, ByteArrayEdges.class);
+    assertEquals(2, classes.length);
+    assertEquals(WritableComparable.class, classes[0]);
+    assertEquals(Writable.class, classes[1]);
+  }
+
+  @Test
+  public void testInferTypeParams() {
+    checkTypes(TypesHolder.class, IntTypes.class, 5);
+    checkTypes(TypesHolder.class, IntComputation.class, 5);
+    checkTypes(Computation.class, IntComputation.class, 5);
+    checkTypes(TypesHolder.class, IntBasicComputation.class, 5);
+    checkTypes(Computation.class, IntBasicComputation.class, 5);
+    checkTypes(BasicComputation.class, IntBasicComputation.class, 4);
+  }
+
+  private <T> void checkTypes(Class<T> baseClass,
+    Class<? extends T> childClass, int numArgs) {
+    Class<?>[] classes = getTypeArguments(baseClass, childClass);
+    assertEquals(numArgs, classes.length);
+    for (Class<?> klass : classes) {
+      assertEquals(IntWritable.class, klass);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py b/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
new file mode 100644
index 0000000..4cad945
--- /dev/null
+++ b/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
@@ -0,0 +1,8 @@
+from org.apache.giraph.graph import BasicComputation
+from org.apache.hadoop.io import IntWritable
+
+class CountEdges(BasicComputation):
+  def compute(self, vertex, messages):
+    num_edges = vertex.getNumEdges()
+    vertex.setValue(IntWritable(num_edges))
+    vertex.voteToHalt()

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
index 13e8edf..1d74843 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
@@ -18,14 +18,17 @@
 
 package org.apache.giraph.examples;
 
-import java.io.IOException;
 import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.conf.BooleanConfOption;
+import org.apache.giraph.conf.LongConfOption;
 import org.apache.giraph.io.VertexReader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import java.io.IOException;
+
 /**
  * Used by GeneratedVertexInputFormat to read some generated data
  *
@@ -39,15 +42,11 @@ public abstract class GeneratedVertexReader<
     E extends Writable>
     extends VertexReader<I, V, E> {
   /** Vertices produced by this reader */
-  public static final String READER_VERTICES =
-    "GeneratedVertexReader.reader_vertices";
-  /** Default vertices produced by this reader */
-  public static final long DEFAULT_READER_VERTICES = 10;
+  public static final LongConfOption READER_VERTICES =
+      new LongConfOption("GeneratedVertexReader.reader_vertices", 10);
   /** Reverse the order of the vertices? */
-  public static final String REVERSE_ID_ORDER =
-    "GeneratedVertexReader.reverseIdOrder";
-  /** Default ordering is not reversed */
-  public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+  public static final BooleanConfOption REVERSE_ID_ORDER =
+      new BooleanConfOption("GeneratedVertexReader.reverseIdOrder", false);
   /** Records read so far */
   protected long recordsRead = 0;
   /** Total records to read (on this split alone) */
@@ -66,12 +65,8 @@ public abstract class GeneratedVertexReader<
   @Override
   public final void initialize(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException {
-    totalRecords = getConf().getLong(
-        GeneratedVertexReader.READER_VERTICES,
-        GeneratedVertexReader.DEFAULT_READER_VERTICES);
-    reverseIdOrder = getConf().getBoolean(
-        GeneratedVertexReader.REVERSE_ID_ORDER,
-        GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+    totalRecords = GeneratedVertexReader.READER_VERTICES.get(getConf());
+    reverseIdOrder = GeneratedVertexReader.REVERSE_ID_ORDER.get(getConf());
     this.inputSplit = (BspInputSplit) inputSplit;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 6c8a0b3..c4286ab 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -243,7 +243,7 @@ public class
     GiraphJob job = prepareJob(callingMethod, conf, outputPath);
     Configuration configuration = job.getConfiguration();
     // GeneratedInputSplit will generate 10 vertices
-    configuration.setLong(GeneratedVertexReader.READER_VERTICES, 10);
+    GeneratedVertexReader.READER_VERTICES.set(configuration, 10);
     assertTrue(job.run(true));
     if (!runningInDistributedMode()) {
       FileStatus fileStatus = getSinglePartFileStatus(configuration, outputPath);
@@ -284,7 +284,7 @@ public class
     conf.setComputationClass(SimpleMsgComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
-    job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
+    GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(), 0);
     assertTrue(job.run(true));
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
index 1092eac..75b98e4 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
@@ -54,8 +54,7 @@ public class TestComputationState extends BspCase {
     job.getConfiguration().setNumComputeThreads(
         TestComputationStateComputation.NUM_COMPUTE_THREADS);
     // Increase the number of vertices
-    job.getConfiguration().setInt(
-        GeneratedVertexReader.READER_VERTICES,
+    GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(),
         TestComputationStateComputation.NUM_VERTICES);
     // Increase the number of partitions
     GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 4537eac..bf87491 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -36,6 +36,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -158,8 +159,7 @@ public class TestGraphPartitioner extends BspCase {
         outputPath);
     job.getConfiguration().setGraphPartitionerFactoryClass(
         SuperstepHashPartitionerFactory.class);
-    job.getConfiguration().setBoolean(
-        GeneratedVertexReader.REVERSE_ID_ORDER, true);
+    GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true);
     assertTrue(job.run(true));
     verifyOutput(hdfs, outputPath);
 
@@ -178,8 +178,8 @@ public class TestGraphPartitioner extends BspCase {
 
     job.getConfiguration().setGraphPartitionerFactoryClass(
         SimpleLongRangePartitionerFactory.class);
-    long readerVertices = job.getConfiguration().getLong(
-        GeneratedVertexReader.READER_VERTICES, -1);
+    long readerVertices =
+        READER_VERTICES.getWithDefault(job.getConfiguration(), -1L);
     job.getConfiguration().setLong(
         GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 340b461..589fed6 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.graph.Computation;
@@ -635,8 +634,6 @@ public class HiveGiraphRunner implements Tool {
    * @param giraphConf GiraphConfiguration
    */
   private void logOptions(GiraphConfiguration giraphConf) {
-    GiraphClasses<?, ?, ?> classes = new GiraphClasses(giraphConf);
-
     LOG.info(getClass().getSimpleName() + " with");
 
     LOG.info(LOG_PREFIX + "-computationClass=" +
@@ -650,7 +647,7 @@ public class HiveGiraphRunner implements Tool {
       LOG.info(LOG_PREFIX + "Edge input: " + description);
     }
 
-    if (classes.getVertexOutputFormatClass() != null) {
+    if (GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.contains(giraphConf)) {
       LOG.info(LOG_PREFIX + "Output: VertexToHive=" +
           vertexToHiveClass.getCanonicalName() + ", table=" +
           HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" +

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0bd85a4..7e166c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -255,6 +255,9 @@ under the License.
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 
     <dep.hiveio.version>0.11</dep.hiveio.version>
+    <dep.jython.version>2.5.3</dep.jython.version>
+    <dep.typetools.version>0.2.1</dep.typetools.version>
+
     <hbase.version>0.90.5</hbase.version>
     <codehaus-jackson.version>1.8.0</codehaus-jackson.version>
     <fasterxml-jackson.version>2.1.0</fasterxml-jackson.version>
@@ -1107,11 +1110,21 @@ under the License.
         <version>${codehaus-jackson.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.jodah</groupId>
+        <artifactId>typetools</artifactId>
+        <version>${dep.typetools.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.json</groupId>
         <artifactId>json</artifactId>
         <version>20090211</version>
       </dependency>
       <dependency>
+        <groupId>org.python</groupId>
+        <artifactId>jython</artifactId>
+        <version>${dep.jython.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>${slf4j.version}</version>


[2/2] git commit: updated refs/heads/trunk to 8f89bd8

Posted by ni...@apache.org.
GIRAPH-683: Jython for Computation (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8f89bd85
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8f89bd85
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8f89bd85

Branch: refs/heads/trunk
Commit: 8f89bd85a03a1fec25e21e334631931f69078040
Parents: 1eaddd1
Author: Nitay Joffe <ni...@apache.org>
Authored: Thu Jun 20 14:21:48 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Thu Jun 20 14:22:03 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 README                                          |  19 ++
 giraph-core/pom.xml                             |   8 +
 .../giraph/benchmark/BenchmarkOption.java       |   7 +
 .../giraph/benchmark/PageRankBenchmark.java     |  32 +-
 .../apache/giraph/conf/AbstractConfOption.java  |  22 ++
 .../java/org/apache/giraph/conf/AllOptions.java |   3 +
 .../apache/giraph/conf/BooleanConfOption.java   |  17 +-
 .../org/apache/giraph/conf/ClassConfOption.java |  27 ++
 .../org/apache/giraph/conf/ConfOptionType.java  |   4 +-
 .../org/apache/giraph/conf/EnumConfOption.java  |  95 ++++++
 .../org/apache/giraph/conf/FloatConfOption.java |  10 +-
 .../org/apache/giraph/conf/GiraphClasses.java   | 105 ++++---
 .../apache/giraph/conf/GiraphConfiguration.java |  35 ++-
 .../org/apache/giraph/conf/GiraphConstants.java |  20 +-
 .../org/apache/giraph/conf/GiraphTypes.java     | 213 +++++++++++++
 .../ImmutableClassesGiraphConfiguration.java    |  38 ++-
 .../org/apache/giraph/conf/IntConfOption.java   |  11 +-
 .../org/apache/giraph/conf/LongConfOption.java  |  21 +-
 .../org/apache/giraph/conf/StrConfOption.java   |  12 +
 .../org/apache/giraph/conf/TypesHolder.java     |  37 +++
 .../org/apache/giraph/graph/Computation.java    |   9 +-
 .../apache/giraph/graph/ComputationFactory.java |  69 +++++
 .../giraph/graph/DefaultComputationFactory.java |  63 ++++
 .../apache/giraph/graph/GraphTaskManager.java   |  45 +--
 .../java/org/apache/giraph/graph/Language.java  |  28 ++
 .../io/formats/IntIntNullTextInputFormat.java   |   7 +-
 .../io/formats/LongLongNullTextInputFormat.java |  89 ++++++
 .../job/GiraphConfigurationValidator.java       | 309 ++++++++-----------
 .../org/apache/giraph/jython/DeployType.java    |  28 ++
 .../giraph/jython/JythonComputationFactory.java | 161 ++++++++++
 .../org/apache/giraph/jython/JythonUtils.java   |  74 +++++
 .../org/apache/giraph/jython/package-info.java  |  21 ++
 .../apache/giraph/master/SuperstepClasses.java  |  37 ++-
 .../apache/giraph/utils/ConfigurationUtils.java | 191 ++++++++----
 .../giraph/utils/DistributedCacheUtils.java     | 101 ++++++
 .../java/org/apache/giraph/utils/FileUtils.java |   6 +-
 .../giraph/utils/InternalVertexRunner.java      |   8 +-
 .../apache/giraph/utils/ReflectionUtils.java    | 139 ++-------
 .../org/apache/giraph/benchmark/page-rank.py    |  34 ++
 .../test/java/org/apache/giraph/BspCase.java    |   8 +-
 .../org/apache/giraph/jython/TestJython.java    |  77 +++++
 .../giraph/utils/TestReflectionUtils.java       | 106 +++++++
 .../org/apache/giraph/jython/count-edges.py     |   8 +
 .../giraph/examples/GeneratedVertexReader.java  |  25 +-
 .../java/org/apache/giraph/TestBspBasic.java    |   4 +-
 .../org/apache/giraph/TestComputationState.java |   3 +-
 .../org/apache/giraph/TestGraphPartitioner.java |   8 +-
 .../apache/giraph/hive/HiveGiraphRunner.java    |   5 +-
 pom.xml                                         |  13 +
 50 files changed, 1907 insertions(+), 507 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 76f7e6b..dda130e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-683: Jython for Computation (nitay)
+
   GIRAPH-673: Input superstep should support aggregators like any 
   other superstep (Bingjing via aching)
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/README
----------------------------------------------------------------------
diff --git a/README b/README
index c2a32ed..fd9ec20 100644
--- a/README
+++ b/README
@@ -131,6 +131,25 @@ locally first.
 
 -------------------------------
 
+Scripting:
+
+Giraph has support for writing user logic in languages other than Java. A Giraph
+job involves at the very least a Computation and Input/Output Formats. There are
+other optional pieces as well like Aggregators and Combiners.
+
+As of this writing we support writing the Computation logic in Jython. The
+Computation class is at the core of the algorithm so it was a natural starting
+point. Eventually it is our goal to allow users to write any / all components of
+their algorithms in any language they desire.
+
+To use Jython with our job launcher, GiraphRunner, pass the path to the script
+as the Computation class argument. Additionally, you should set the -jythonClass
+option to let Giraph know the name of your Jython Computation class. Lastly, you
+will need to set -typesHolder to a class that extends Giraph's TypesHolder so
+that Giraph can infer the types you use. Look at page-rank.py as an example.
+
+-------------------------------
+
 How to run the unittests on a local pseudo-distributed Hadoop instance:
 
 As mentioned earlier, Giraph supports several versions of Hadoop.  In

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 3ffe175..fdfff4b 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -457,10 +457,18 @@ under the License.
       <artifactId>jackson-mapper-asl</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.jodah</groupId>
+      <artifactId>typetools</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.json</groupId>
       <artifactId>json</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.python</groupId>
+      <artifactId>jython</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
index 23c614b..ca1fd19 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
@@ -55,6 +55,13 @@ public class BenchmarkOption {
       new BenchmarkOption(
           "l", "localEdgesMinRatio", true,
           "Minimum ratio of partition-local edges (default is 0)");
+  /** Option for using Jython */
+  public static final BenchmarkOption JYTHON =
+      new BenchmarkOption("j", "jython", false, "Use jython implementation");
+  /** Option for path to script for computation */
+  public static final BenchmarkOption SCRIPT_PATH =
+      new BenchmarkOption("sp", "scriptPath", true,
+          "Path to script for computation, can be local or HDFS path");
 
   /** Short option */
   private String shortOption;

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index bd2939e..d88ff0d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -21,15 +21,24 @@ package org.apache.giraph.benchmark;
 import org.apache.commons.cli.CommandLine;
 import org.apache.giraph.combiner.FloatSumCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
 import org.apache.giraph.edge.IntNullArrayEdges;
 import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
 import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat;
+import org.apache.giraph.jython.DeployType;
+import org.apache.giraph.jython.JythonUtils;
+import org.apache.giraph.utils.DistributedCacheUtils;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.collect.Sets;
 
 import java.util.Set;
 
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
+
 /**
  * Benchmark for {@link PageRankComputation}
  */
@@ -38,13 +47,32 @@ public class PageRankBenchmark extends GiraphBenchmark {
   public Set<BenchmarkOption> getBenchmarkOptions() {
     return Sets.newHashSet(BenchmarkOption.VERTICES,
         BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.SUPERSTEPS,
-        BenchmarkOption.LOCAL_EDGES_MIN_RATIO);
+        BenchmarkOption.LOCAL_EDGES_MIN_RATIO, BenchmarkOption.JYTHON,
+        BenchmarkOption.SCRIPT_PATH);
   }
 
   @Override
   protected void prepareConfiguration(GiraphConfiguration conf,
       CommandLine cmd) {
-    conf.setComputationClass(PageRankComputation.class);
+    if (BenchmarkOption.JYTHON.optionTurnedOn(cmd)) {
+      GiraphTypes types = new GiraphTypes();
+      types.inferFrom(PageRankComputation.class);
+      String script;
+      if (BenchmarkOption.SCRIPT_PATH.optionTurnedOn(cmd)) {
+        JYTHON_DEPLOY_TYPE.set(conf, DeployType.DISTRIBUTED_CACHE);
+        String path = BenchmarkOption.SCRIPT_PATH.getOptionValue(cmd);
+        Path hadoopPath = new Path(path);
+        Path remotePath = DistributedCacheUtils.copyToHdfs(hadoopPath, conf);
+        DistributedCache.addCacheFile(remotePath.toUri(), conf);
+        script = remotePath.toString();
+      } else {
+        JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+        script = ReflectionUtils.getPackagePath(this) + "/page-rank.py";
+      }
+      JythonUtils.init(conf, script, "PageRank", types);
+    } else {
+      conf.setComputationClass(PageRankComputation.class);
+    }
     conf.setOutEdgesClass(IntNullArrayEdges.class);
     conf.setCombinerClass(FloatSumCombiner.class);
     conf.setVertexInputFormatClass(

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
index d00f7e9..e1ab802 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
@@ -17,6 +17,7 @@
  */
 package org.apache.giraph.conf;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
 import com.google.common.base.Objects;
@@ -35,6 +36,7 @@ public abstract class AbstractConfOption
 
   /**
    * Constructor
+   *
    * @param key configuration key
    */
   public AbstractConfOption(String key) {
@@ -45,6 +47,16 @@ public abstract class AbstractConfOption
     return key;
   }
 
+  /**
+   * Check if option is set in configuration
+   *
+   * @param conf Configuration
+   * @return true if option is set
+   */
+  public boolean contains(Configuration conf) {
+    return conf.get(key) != null;
+  }
+
   @Override public int compareTo(AbstractConfOption o) {
     return ComparisonChain.start()
       .compare(getType(), o.getType())
@@ -79,13 +91,23 @@ public abstract class AbstractConfOption
   }
 
   /**
+   * Check if the value set is the same as the default value
+   *
+   * @param conf Configuration
+   * @return true if value set is default value
+   */
+  public abstract boolean isDefaultValue(Configuration conf);
+
+  /**
    * Get string representation of default value
+   *
    * @return String
    */
   public abstract String getDefaultValueStr();
 
   /**
    * Get type this option holds
+   *
    * @return ConfOptionType
    */
   public abstract ConfOptionType getType();

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
index 5d150d0..68b3ed9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -43,6 +43,7 @@ public class AllOptions {
   /**
    * Add an option. Subclasses of {@link AbstractConfOption} should call this
    * at the end of their constructor.
+   *
    * @param confOption option
    */
   public static void add(AbstractConfOption confOption) {
@@ -51,6 +52,7 @@ public class AllOptions {
 
   /**
    * String representation of all of the options stored
+   *
    * @return string
    */
   public static String allOptionsString() {
@@ -70,6 +72,7 @@ public class AllOptions {
 
   /**
    * Command line utility to dump all Giraph options
+   *
    * @param args cmdline args
    */
   public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
index c16ec88..f095905 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
@@ -28,6 +28,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key configuration key
    * @param defaultValue default value
    */
@@ -37,10 +38,19 @@ public class BooleanConfOption extends AbstractConfOption {
     AllOptions.add(this);
   }
 
-  public boolean isDefaultValue() {
+  /**
+   * Get the default value of this option
+   *
+   * @return default value
+   */
+  public boolean getDefaultValue() {
     return defaultValue;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return get(conf) == defaultValue;
+  }
+
   @Override public String getDefaultValueStr() {
     return Boolean.toString(defaultValue);
   }
@@ -51,6 +61,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Lookup value in Configuration
+   *
    * @param conf Configuration
    * @return value for key in conf, or defaultValue if not present
    */
@@ -60,6 +71,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Check if value is true
+   *
    * @param conf Configuration
    * @return true if value is set and true, false otherwise
    */
@@ -69,6 +81,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Check if value is false
+   *
    * @param conf Configuration
    * @return true if value is set and true, false otherwise
    */
@@ -78,6 +91,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Set value in configuration for this key
+   *
    * @param conf Configuration
    * @param value to set
    */
@@ -87,6 +101,7 @@ public class BooleanConfOption extends AbstractConfOption {
 
   /**
    * Set value in configuration if it hasn't been set already
+   *
    * @param conf Configuration
    * @param value to set
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
index 41d120b..43d6b0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
@@ -20,8 +20,11 @@ package org.apache.giraph.conf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
 
+import com.google.common.base.Objects;
+
 /**
  * Class configuration option
+ *
  * @param <C> interface of class
  */
 public class ClassConfOption<C> extends AbstractConfOption {
@@ -35,6 +38,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Private constructor
+   *
    * @param key Key
    * @param defaultClass default class
    * @param interfaceClass interface class
@@ -49,6 +53,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Static create method
+   *
    * @param key key
    * @param defaultClass default class
    * @param interfaceClass interface class
@@ -68,6 +73,10 @@ public class ClassConfOption<C> extends AbstractConfOption {
     return interfaceClass;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return Objects.equal(get(conf), defaultClass);
+  }
+
   @Override public String getDefaultValueStr() {
     return defaultClass == null ? "null" : defaultClass.getSimpleName();
   }
@@ -87,6 +96,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Lookup value
+   *
    * @param conf Configuration
    * @return Class set for key, or defaultClass
    */
@@ -96,6 +106,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Lookup array of classes for key
+   *
    * @param conf Configuration
    * @return array of classes
    */
@@ -127,6 +138,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Lookup with user specified default value
+   *
    * @param conf Configuration
    * @param defaultValue default value
    * @return Class
@@ -138,6 +150,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Set value for key
+   *
    * @param conf Configuration
    * @param klass Class to set
    */
@@ -146,7 +159,20 @@ public class ClassConfOption<C> extends AbstractConfOption {
   }
 
   /**
+   * Set value for key if it is not already set
+   *
+   * @param conf Configuration
+   * @param klass Class to set
+   */
+  public void setIfUnset(Configuration conf, Class<? extends C> klass) {
+    if (!contains(conf)) {
+      set(conf, klass);
+    }
+  }
+
+  /**
    * Set classes for this key
+   *
    * @param conf Configuration
    * @param klasses Classes to set
    */
@@ -165,6 +191,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
 
   /**
    * Add class to list for key
+   *
    * @param conf Configuration
    * @param klass Class to add
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
index 8f70d90..edb878a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
@@ -25,6 +25,8 @@ public enum ConfOptionType {
   BOOLEAN,
   /** class */
   CLASS,
+  /** enum */
+  ENUM,
   /** integer */
   INTEGER,
   /** float */
@@ -32,5 +34,5 @@ public enum ConfOptionType {
   /** long */
   LONG,
   /** string */
-  STRING;
+  STRING
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
new file mode 100644
index 0000000..0e23379
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Objects;
+
+/**
+ * Enum Configuration option
+ *
+ * @param <T> Enum class
+ */
+public class EnumConfOption<T extends Enum<T>> extends AbstractConfOption {
+  /** Enum class */
+  private final Class<T> klass;
+  /** Default value */
+  private final T defaultValue;
+
+  /**
+   * Constructor
+   *
+   * @param key Configuration key
+   * @param klass Enum class
+   * @param defaultValue default value
+   */
+  public EnumConfOption(String key, Class<T> klass, T defaultValue) {
+    super(key);
+    this.klass = klass;
+    this.defaultValue = defaultValue;
+    AllOptions.add(this);
+  }
+
+  /**
+   * Create new EnumConfOption
+   *
+   * @param key String configuration key
+   * @param klass enum class
+   * @param defaultValue default enum value
+   * @param <X> enum type
+   * @return EnumConfOption
+   */
+  public static <X extends Enum<X>> EnumConfOption<X>
+  create(String key, Class<X> klass, X defaultValue) {
+    return new EnumConfOption<X>(key, klass, defaultValue);
+  }
+
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return Objects.equal(get(conf), defaultValue);
+  }
+
+  @Override public String getDefaultValueStr() {
+    return defaultValue.name();
+  }
+
+  @Override public ConfOptionType getType() {
+    return ConfOptionType.ENUM;
+  }
+
+  /**
+   * Lookup value
+   *
+   * @param conf Configuration
+   * @return enum value
+   */
+  public T get(Configuration conf) {
+    String valueStr = conf.get(getKey(), getDefaultValueStr());
+    return T.valueOf(klass, valueStr);
+  }
+
+  /**
+   * Set value
+   *
+   * @param conf Configuration
+   * @param value to set
+   */
+  public void set(Configuration conf, Enum<T> value) {
+    conf.set(getKey(), value.name());
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
index fa21a28..62efdbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
@@ -28,6 +28,7 @@ public class FloatConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key Configuration key
    * @param defaultValue default value
    */
@@ -41,6 +42,10 @@ public class FloatConfOption extends AbstractConfOption {
     return defaultValue;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return Float.compare(get(conf), defaultValue) == 0;
+  }
+
   @Override public String getDefaultValueStr() {
     return Float.toString(defaultValue);
   }
@@ -51,6 +56,7 @@ public class FloatConfOption extends AbstractConfOption {
 
   /**
    * Lookup value
+   *
    * @param conf Configuration
    * @return value for key, or defaultValue if not present
    */
@@ -60,6 +66,7 @@ public class FloatConfOption extends AbstractConfOption {
 
   /**
    * Set value
+   *
    * @param conf Configuration
    * @param value to set
    */
@@ -69,11 +76,12 @@ public class FloatConfOption extends AbstractConfOption {
 
   /**
    * Set value if it's not already present
+   *
    * @param conf Configuration
    * @param value to set
    */
   public void setIfUnset(Configuration conf, float value) {
-    if (conf.get(getKey()) == null) {
+    if (!contains(conf)) {
       conf.setFloat(getKey(), value);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 621bb14..f039bdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -20,34 +20,35 @@ package org.apache.giraph.conf;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.giraph.graph.DefaultComputationFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
 import org.apache.giraph.io.filters.DefaultVertexInputFilter;
 import org.apache.giraph.io.filters.EdgeInputFilter;
-import org.apache.giraph.io.EdgeInputFormat;
 import org.apache.giraph.io.filters.VertexInputFilter;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.master.DefaultMasterCompute;
 import org.apache.giraph.master.MasterCompute;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.HashPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.SimplePartition;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.DefaultWorkerContext;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import java.util.List;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
 
 /**
  * Holder for classes used by Giraph.
@@ -60,20 +61,16 @@ import java.util.List;
 public class GiraphClasses<I extends WritableComparable,
     V extends Writable, E extends Writable>
     implements GiraphConstants {
+  /** ComputationFactory class - cached for fast access */
+  protected Class<? extends ComputationFactory<I, V, E,
+      ? extends Writable, ? extends Writable>>
+  computationFactoryClass;
   /** Computation class - cached for fast access */
   protected Class<? extends
       Computation<I, V, E, ? extends Writable, ? extends Writable>>
   computationClass;
-  /** Vertex id class - cached for fast access */
-  protected Class<I> vertexIdClass;
-  /** Vertex value class - cached for fast access */
-  protected Class<V> vertexValueClass;
-  /** Edge value class - cached for fast access */
-  protected Class<E> edgeValueClass;
-  /** Incoming message value class - cached for fast access */
-  protected Class<? extends Writable> incomingMessageValueClass;
-  /** Outgoing message value class - cached for fast access */
-  protected Class<? extends Writable> outgoingMessageValueClass;
+  /** Generic types used to describe graph */
+  protected GiraphTypes<I, V, E> giraphTypes;
   /** Vertex edges class - cached for fast access */
   protected Class<? extends OutEdges<I, E>> outEdgesClass;
   /** Input vertex edges class - cached for fast access */
@@ -123,6 +120,10 @@ public class GiraphClasses<I extends WritableComparable,
   public GiraphClasses() {
     // Note: the cast to Object is required in order for javac to accept the
     // downcast.
+    computationFactoryClass = (Class<? extends ComputationFactory<I, V, E,
+          ? extends Writable, ? extends Writable>>) (Object)
+        DefaultComputationFactory.class;
+    giraphTypes = new GiraphTypes<I, V, E>();
     outEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
         ByteArrayEdges.class;
     inputOutEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
@@ -151,27 +152,15 @@ public class GiraphClasses<I extends WritableComparable,
    * @param conf Configuration object to read from.
    */
   public GiraphClasses(Configuration conf) {
-    readFromConf(conf);
-  }
-
-  /**
-   * Read classes from Configuration.
-   *
-   * @param conf Configuration to read from.
-   */
-  private void readFromConf(Configuration conf) {
-    // set pre-validated generic parameter types into Configuration
+    giraphTypes = GiraphTypes.readFrom(conf);
+    computationFactoryClass =
+        (Class<? extends ComputationFactory<I, V, E,
+            ? extends Writable, ? extends Writable>>)
+            COMPUTATION_FACTORY_CLASS.get(conf);
     computationClass =
         (Class<? extends
             Computation<I, V, E, ? extends Writable, ? extends Writable>>)
             COMPUTATION_CLASS.get(conf);
-    List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(Computation.class, computationClass);
-    vertexIdClass = (Class<I>) classList.get(0);
-    vertexValueClass = (Class<V>) classList.get(1);
-    edgeValueClass = (Class<E>) classList.get(2);
-    incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
-    outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
 
     outEdgesClass = (Class<? extends OutEdges<I, E>>)
         VERTEX_EDGES_CLASS.get(conf);
@@ -207,6 +196,11 @@ public class GiraphClasses<I extends WritableComparable,
         VERTEX_INPUT_FILTER_CLASS.get(conf);
   }
 
+  public Class<? extends ComputationFactory<I, V, E,
+      ? extends Writable, ? extends Writable>> getComputationFactoryClass() {
+    return computationFactoryClass;
+  }
+
   /**
    * Get Computation class
    *
@@ -218,13 +212,17 @@ public class GiraphClasses<I extends WritableComparable,
     return computationClass;
   }
 
+  public GiraphTypes<I, V, E> getGiraphTypes() {
+    return giraphTypes;
+  }
+
   /**
    * Get Vertex ID class
    *
    * @return Vertex ID class
    */
   public Class<I> getVertexIdClass() {
-    return vertexIdClass;
+    return giraphTypes.getVertexIdClass();
   }
 
   /**
@@ -233,7 +231,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return Vertex Value class
    */
   public Class<V> getVertexValueClass() {
-    return vertexValueClass;
+    return giraphTypes.getVertexValueClass();
   }
 
   /**
@@ -242,7 +240,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return Edge Value class
    */
   public Class<E> getEdgeValueClass() {
-    return edgeValueClass;
+    return giraphTypes.getEdgeValueClass();
   }
 
   /**
@@ -252,7 +250,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return Message Value class
    */
   public Class<? extends Writable> getIncomingMessageValueClass() {
-    return incomingMessageValueClass;
+    return giraphTypes.getIncomingMessageValueClass();
   }
 
   /**
@@ -262,7 +260,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return Message Value class
    */
   public Class<? extends Writable> getOutgoingMessageValueClass() {
-    return outgoingMessageValueClass;
+    return giraphTypes.getOutgoingMessageValueClass();
   }
 
   /**
@@ -274,10 +272,11 @@ public class GiraphClasses<I extends WritableComparable,
     return outEdgesClass;
   }
 
-  /* Get Vertex edges class used during edge-based input
- *
- * @return Vertex edges class.
- */
+  /**
+   * Get Vertex edges class used during edge-based input
+   *
+   * @return Vertex edges class.
+   */
   public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
     return inputOutEdgesClass;
   }
@@ -486,10 +485,14 @@ public class GiraphClasses<I extends WritableComparable,
       Computation<I, V, E, ? extends Writable, ? extends Writable>>
       computationClass) {
     this.computationClass = computationClass;
-    List<Class<?>> classList =
-        ReflectionUtils.getTypeArguments(Computation.class, computationClass);
-    incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
-    outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
+    if (computationClass != null) {
+      Class<?>[] classList =
+          getTypeArguments(TypesHolder.class, computationClass);
+      giraphTypes.setIncomingMessageValueClass(
+          (Class<? extends Writable>) classList[3]);
+      giraphTypes.setOutgoingMessageValueClass(
+          (Class<? extends Writable>) classList[4]);
+    }
     return this;
   }
 
@@ -500,7 +503,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setVertexIdClass(Class<I> vertexIdClass) {
-    this.vertexIdClass = vertexIdClass;
+    giraphTypes.setVertexIdClass(vertexIdClass);
     return this;
   }
 
@@ -511,7 +514,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setVertexValueClass(Class<V> vertexValueClass) {
-    this.vertexValueClass = vertexValueClass;
+    giraphTypes.setVertexValueClass(vertexValueClass);
     return this;
   }
 
@@ -522,7 +525,7 @@ public class GiraphClasses<I extends WritableComparable,
    * @return this
    */
   public GiraphClasses setEdgeValueClass(Class<E> edgeValueClass) {
-    this.edgeValueClass = edgeValueClass;
+    giraphTypes.setEdgeValueClass(edgeValueClass);
     return this;
   }
 
@@ -535,7 +538,7 @@ public class GiraphClasses<I extends WritableComparable,
    */
   public GiraphClasses setIncomingMessageValueClass(
       Class<? extends Writable> incomingMessageValueClass) {
-    this.incomingMessageValueClass = incomingMessageValueClass;
+    giraphTypes.setIncomingMessageValueClass(incomingMessageValueClass);
     return this;
   }
 
@@ -548,7 +551,7 @@ public class GiraphClasses<I extends WritableComparable,
    */
   public GiraphClasses setOutgoingMessageValueClass(
       Class<? extends Writable> outgoingMessageValueClass) {
-    this.outgoingMessageValueClass = outgoingMessageValueClass;
+    giraphTypes.setOutgoingMessageValueClass(outgoingMessageValueClass);
     return this;
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 58a3f01..87c4c18 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,9 +20,10 @@ package org.apache.giraph.conf;
 
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReuseObjectsOutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -36,6 +37,7 @@ import org.apache.giraph.master.MasterObserver;
 import org.apache.giraph.partition.GraphPartitionerFactory;
 import org.apache.giraph.partition.Partition;
 import org.apache.giraph.partition.ReusesObjectsPartition;
+import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.WorkerContext;
 import org.apache.giraph.worker.WorkerObserver;
 import org.apache.hadoop.conf.Configuration;
@@ -78,6 +80,27 @@ public class GiraphConfiguration extends Configuration
   }
 
   /**
+   * Get name of computation being run. We leave this up to the
+   * {@link ComputationFactory} to decide what to return.
+   *
+   * @return Name of computation being run
+   */
+  public String getComputationName() {
+    ComputationFactory compFactory = ReflectionUtils.newInstance(
+        getComputationFactoryClass());
+    return compFactory.computationName(this);
+  }
+
+  /**
+   * Get the user's subclassed {@link ComputationFactory}
+   *
+   * @return User's computation factory class
+   */
+  public Class<? extends ComputationFactory> getComputationFactoryClass() {
+    return COMPUTATION_FACTORY_CLASS.get(this);
+  }
+
+  /**
    * Get the user's subclassed {@link Computation}
    *
    * @return User's computation class
@@ -525,6 +548,7 @@ public class GiraphConfiguration extends Configuration
 
   /**
    * Getter for SPLIT_MASTER_WORKER flag.
+   *
    * @return boolean flag value.
    */
   public final boolean getSplitMasterWorker() {
@@ -570,6 +594,7 @@ public class GiraphConfiguration extends Configuration
   /**
    * Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
    * actually managing our cluster nodes, i.e. each task is a Mapper.
+   *
    * @return TRUE if this is a pure YARN job.
    */
   public boolean isPureYarnJob() {
@@ -579,6 +604,7 @@ public class GiraphConfiguration extends Configuration
   /**
    * Jars required in "Pure YARN" jobs (names only, no paths) should
    * be listed here in full, including Giraph framework jar(s).
+   *
    * @return the comma-separated list of jar names for export to cluster.
    */
   public String getYarnLibJars() {
@@ -587,6 +613,7 @@ public class GiraphConfiguration extends Configuration
 
   /**
    * Populate jar list for Pure YARN jobs.
+   *
    * @param jarList a comma-separated list of jar names
    */
   public void setYarnLibJars(String jarList) {
@@ -596,6 +623,7 @@ public class GiraphConfiguration extends Configuration
   /**
    * Get heap size (in MB) for each task in our Giraph job run,
    * assuming this job will run on the "pure YARN" profile.
+   *
    * @return the heap size for all tasks, in MB
    */
   public int getYarnTaskHeapMb() {
@@ -605,6 +633,7 @@ public class GiraphConfiguration extends Configuration
   /**
    * Set heap size for Giraph tasks in our job run, assuming
    * the job will run on the "pure YARN" profile.
+   *
    * @param heapMb the heap size for all tasks
    */
   public void setYarnTaskHeapMb(int heapMb) {
@@ -635,6 +664,7 @@ public class GiraphConfiguration extends Configuration
 
   /**
    * is this job run a local test?
+   *
    * @return the test status as recorded in the Configuration
    */
   public boolean getLocalTestMode() {
@@ -643,6 +673,7 @@ public class GiraphConfiguration extends Configuration
 
   /**
    * Flag this job as a local test run.
+   *
    * @param flag the test status for this job
    */
   public void setLocalTestMode(boolean flag) {
@@ -652,6 +683,7 @@ public class GiraphConfiguration extends Configuration
   /**
    * The number of server tasks in our ZK quorum for
    * this job run.
+   *
    * @return the number of ZK servers in the quorum
    */
   public int getZooKeeperServerCount() {
@@ -1014,6 +1046,7 @@ public class GiraphConfiguration extends Configuration
 
   /**
    * Get the output directory to write YourKit snapshots to
+   *
    * @param context Map context
    * @return output directory
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2d0f59c..f090baa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,11 +20,14 @@ package org.apache.giraph.conf;
 import org.apache.giraph.aggregators.AggregatorWriter;
 import org.apache.giraph.aggregators.TextAggregatorWriter;
 import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.giraph.graph.DefaultComputationFactory;
 import org.apache.giraph.graph.DefaultVertexResolver;
 import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Language;
 import org.apache.giraph.graph.VertexResolver;
 import org.apache.giraph.graph.VertexValueFactory;
 import org.apache.giraph.io.EdgeInputFormat;
@@ -64,6 +67,14 @@ public interface GiraphConstants {
   ClassConfOption<Computation> COMPUTATION_CLASS =
       ClassConfOption.create("giraph.computationClass", null,
           Computation.class);
+  /** Computation factory class - optional */
+  ClassConfOption<ComputationFactory> COMPUTATION_FACTORY_CLASS =
+      ClassConfOption.create("giraph.computation.factory.class",
+          DefaultComputationFactory.class, ComputationFactory.class);
+  /** TypesHolder, used if Computation not set - optional */
+  ClassConfOption<TypesHolder> TYPES_HOLDER_CLASS =
+      ClassConfOption.create("giraph.typesHolder", null,
+          TypesHolder.class);
   /** Vertex value factory class - optional */
   ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
       ClassConfOption.create("giraph.vertexValueFactoryClass",
@@ -97,6 +108,11 @@ public interface GiraphConstants {
       ClassConfOption.create("giraph.vertexResolverClass",
           DefaultVertexResolver.class, VertexResolver.class);
 
+  /** Which language computation is implemented in */
+  EnumConfOption<Language> COMPUTATION_LANGUAGE =
+      EnumConfOption.create("giraph.computation.language",
+          Language.class, Language.JAVA);
+
   /**
    * Option of whether to create vertexes that were not existent before but
    * received messages
@@ -142,7 +158,7 @@ public interface GiraphConstants {
    * application, saveVertex will be called right after each vertex.compute()
    * is called.
    * NOTE: This feature doesn't work well with checkpointing - if you restart
-   * from a checkpoint you won't have any ouptut from previous supresteps.
+   * from a checkpoint you won't have any output from previous supersteps.
    */
   BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
       new BooleanConfOption("giraph.doOutputDuringComputation", false);

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
new file mode 100644
index 0000000..0727270
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
+/**
+ * Holder for the generic types that describe user's graph.
+ *
+ * @param <I> Vertex ID class
+ * @param <V> Vertex Value class
+ * @param <E> Edge class
+ */
+public class GiraphTypes<I extends WritableComparable, V extends Writable,
+    E extends Writable> {
+  /** Vertex id class */
+  private Class<I> vertexIdClass;
+  /** Vertex value class */
+  private Class<V> vertexValueClass;
+  /** Edge value class */
+  private Class<E> edgeValueClass;
+  /** Incoming message value class */
+  private Class<? extends Writable> incomingMessageValueClass;
+  /** Outgoing message value class */
+  private Class<? extends Writable> outgoingMessageValueClass;
+
+  /**
+   * Empty Constructor
+   */
+  public GiraphTypes() { }
+
+  /**
+   * Constructor taking values
+   *
+   * @param vertexIdClass vertex id class
+   * @param vertexValueClass vertex value class
+   * @param edgeValueClass edge value class
+   * @param incomingMessageValueClass incoming message class
+   * @param outgoingMessageValueClass outgoing message class
+   */
+  public GiraphTypes(Class<I> vertexIdClass,
+      Class<V> vertexValueClass,
+      Class<E> edgeValueClass,
+      Class<? extends Writable> incomingMessageValueClass,
+      Class<? extends Writable> outgoingMessageValueClass) {
+    this.edgeValueClass = edgeValueClass;
+    this.incomingMessageValueClass = incomingMessageValueClass;
+    this.outgoingMessageValueClass = outgoingMessageValueClass;
+    this.vertexIdClass = vertexIdClass;
+    this.vertexValueClass = vertexValueClass;
+  }
+
+  /**
+   * Read types from a {@link Configuration}.
+   * First tries to read them directly from the configuration options.
+   * If that doesn't work, tries to infer from {@link TypesHolder}.
+   *
+   * @param conf Configuration
+   * @param <IX> vertex id
+   * @param <VX> vertex value
+   * @param <EX> edge value
+   * @return GiraphTypes
+   */
+  public static <IX extends WritableComparable, VX extends Writable,
+        EX extends Writable> GiraphTypes<IX, VX, EX> readFrom(
+      Configuration conf) {
+    GiraphTypes<IX, VX, EX> types = new GiraphTypes<IX, VX, EX>();
+    types.readDirect(conf);
+    if (!types.hasData()) {
+      Class<? extends TypesHolder> klass = getTypesHolderClass(conf);
+      if (klass != null) {
+        types.inferFrom(klass);
+      }
+    }
+    return types;
+  }
+
+  /**
+   * Infer types from Computation class
+   *
+   * @param klass Computation class
+   */
+  public void inferFrom(Class<? extends TypesHolder> klass) {
+    Class<?>[] classList = getTypeArguments(TypesHolder.class, klass);
+    Preconditions.checkArgument(classList.length == 5);
+    vertexIdClass = (Class<I>) classList[0];
+    vertexValueClass = (Class<V>) classList[1];
+    edgeValueClass = (Class<E>) classList[2];
+    incomingMessageValueClass = (Class<? extends Writable>) classList[3];
+    outgoingMessageValueClass = (Class<? extends Writable>) classList[4];
+  }
+
+  /**
+   * Read types directly from Configuration
+   *
+   * @param conf Configuration
+   */
+  private void readDirect(Configuration conf) {
+    vertexIdClass = (Class<I>) VERTEX_ID_CLASS.get(conf);
+    vertexValueClass = (Class<V>) VERTEX_VALUE_CLASS.get(conf);
+    edgeValueClass = (Class<E>) EDGE_VALUE_CLASS.get(conf);
+    incomingMessageValueClass = INCOMING_MESSAGE_VALUE_CLASS.get(conf);
+    outgoingMessageValueClass = OUTGOING_MESSAGE_VALUE_CLASS.get(conf);
+  }
+
+  /**
+   * Check if types are set
+   *
+   * @return true if types are set
+   */
+  public boolean hasData() {
+    return vertexIdClass != null &&
+        vertexValueClass != null &&
+        edgeValueClass != null &&
+        incomingMessageValueClass != null &&
+        outgoingMessageValueClass != null;
+  }
+
+  /**
+   * Write types to Configuration
+   *
+   * @param conf Configuration
+   */
+  public void writeTo(Configuration conf) {
+    VERTEX_ID_CLASS.set(conf, vertexIdClass);
+    VERTEX_VALUE_CLASS.set(conf, vertexValueClass);
+    EDGE_VALUE_CLASS.set(conf, edgeValueClass);
+    INCOMING_MESSAGE_VALUE_CLASS.set(conf, incomingMessageValueClass);
+    OUTGOING_MESSAGE_VALUE_CLASS.set(conf, outgoingMessageValueClass);
+  }
+
+  /**
+   * Write types to Configuration if not already set
+   *
+   * @param conf Configuration
+   */
+  public void writeIfUnset(Configuration conf) {
+    VERTEX_ID_CLASS.setIfUnset(conf, vertexIdClass);
+    VERTEX_VALUE_CLASS.setIfUnset(conf, vertexValueClass);
+    EDGE_VALUE_CLASS.setIfUnset(conf, edgeValueClass);
+    INCOMING_MESSAGE_VALUE_CLASS.setIfUnset(conf, incomingMessageValueClass);
+    OUTGOING_MESSAGE_VALUE_CLASS.setIfUnset(conf, outgoingMessageValueClass);
+  }
+
+  public Class<E> getEdgeValueClass() {
+    return edgeValueClass;
+  }
+
+  public Class<? extends Writable> getIncomingMessageValueClass() {
+    return incomingMessageValueClass;
+  }
+
+  public Class<? extends Writable> getOutgoingMessageValueClass() {
+    return outgoingMessageValueClass;
+  }
+
+  public Class<I> getVertexIdClass() {
+    return vertexIdClass;
+  }
+
+  public Class<V> getVertexValueClass() {
+    return vertexValueClass;
+  }
+
+  public void setEdgeValueClass(Class<E> edgeValueClass) {
+    this.edgeValueClass = edgeValueClass;
+  }
+
+  public void setIncomingMessageValueClass(
+      Class<? extends Writable> incomingMessageValueClass) {
+    this.incomingMessageValueClass = incomingMessageValueClass;
+  }
+
+  public void setOutgoingMessageValueClass(
+      Class<? extends Writable> outgoingMessageValueClass) {
+    this.outgoingMessageValueClass = outgoingMessageValueClass;
+  }
+
+  public void setVertexIdClass(Class<I> vertexIdClass) {
+    this.vertexIdClass = vertexIdClass;
+  }
+
+  public void setVertexValueClass(Class<V> vertexValueClass) {
+    this.vertexValueClass = vertexValueClass;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index dbd2452..64d0fb2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.graph.ComputationFactory;
 import org.apache.giraph.graph.DefaultVertex;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.graph.Vertex;
@@ -114,6 +115,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Configure an object with this instance if the object is configurable.
+   *
    * @param obj Object
    */
   public void configureIfPossible(Object obj) {
@@ -134,6 +136,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Get the edge input filter to use
+   *
    * @return EdgeInputFilter
    */
   public EdgeInputFilter getEdgeInputFilter() {
@@ -152,6 +155,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Get the vertex input filter to use
+   *
    * @return VertexInputFilter
    */
   public VertexInputFilter getVertexInputFilter() {
@@ -450,13 +454,44 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
   }
 
   /**
+   * Get computation factory class
+   *
+   * @return computation factory class
+   */
+  @Override
+  public Class<? extends ComputationFactory<I, V, E,
+      ? extends Writable, ? extends Writable>>
+  getComputationFactoryClass() {
+    return classes.getComputationFactoryClass();
+  }
+
+  /**
+   * Get computation factory
+   *
+   * @return computation factory
+   */
+  public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
+  createComputationFactory() {
+    return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
+  }
+
+  /**
    * Create a user computation
    *
    * @return Instantiated user computation
    */
   public Computation<I, V, E, ? extends Writable, ? extends Writable>
   createComputation() {
-    return ReflectionUtils.newInstance(getComputationClass(), this);
+    return createComputationFactory().getComputation(this);
+  }
+
+  /**
+   * Get user types describing graph (I,V,E,M1,M2)
+   *
+   * @return GiraphTypes
+   */
+  public GiraphTypes<I, V, E> getGiraphTypes() {
+    return classes.getGiraphTypes();
   }
 
   /**
@@ -554,6 +589,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
 
   /**
    * Create job observer
+   *
    * @return GiraphJobObserver set in configuration.
    */
   public GiraphJobObserver getJobObserver() {

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
index de75e9d..142c090 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
@@ -28,6 +28,7 @@ public class IntConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key key
    * @param defaultValue default value
    */
@@ -39,6 +40,7 @@ public class IntConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key key
    * @param defaultValue default value
    */
@@ -52,6 +54,10 @@ public class IntConfOption extends AbstractConfOption {
     return defaultValue;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return get(conf) == defaultValue;
+  }
+
   @Override public String getDefaultValueStr() {
     return Integer.toString(defaultValue);
   }
@@ -62,6 +68,7 @@ public class IntConfOption extends AbstractConfOption {
 
   /**
    * Lookup value
+   *
    * @param conf Configuration
    * @return value for key, or default value if not set
    */
@@ -71,6 +78,7 @@ public class IntConfOption extends AbstractConfOption {
 
   /**
    * Set value
+   *
    * @param conf Configuration
    * @param value to set
    */
@@ -80,11 +88,12 @@ public class IntConfOption extends AbstractConfOption {
 
   /**
    * Set value if it's not already present
+   *
    * @param conf Configuration
    * @param value to set
    */
   public void setIfUnset(Configuration conf, int value) {
-    if (conf.get(getKey()) == null) {
+    if (!contains(conf)) {
       conf.setInt(getKey(), value);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
index 0cbc164..a5c939d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
@@ -28,6 +28,7 @@ public class LongConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key key
    * @param defaultValue default value
    */
@@ -41,6 +42,10 @@ public class LongConfOption extends AbstractConfOption {
     return defaultValue;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return get(conf) == defaultValue;
+  }
+
   @Override public String getDefaultValueStr() {
     return Long.toString(defaultValue);
   }
@@ -51,6 +56,7 @@ public class LongConfOption extends AbstractConfOption {
 
   /**
    * Lookup value
+   *
    * @param conf Configuration
    * @return value set for key, or defaultValue
    */
@@ -59,7 +65,19 @@ public class LongConfOption extends AbstractConfOption {
   }
 
   /**
+   * Lookup value, use passed in default value if not found.
+   *
+   * @param conf Configuration
+   * @param val default value to use
+   * @return set for key, or default value passed in
+   */
+  public long getWithDefault(Configuration conf, long val) {
+    return conf.getLong(getKey(), val);
+  }
+
+  /**
    * Set value for key
+   *
    * @param conf Configuration
    * @param value to set
    */
@@ -69,11 +87,12 @@ public class LongConfOption extends AbstractConfOption {
 
   /**
    * Set value if it's not already present
+   *
    * @param conf Configuration
    * @param value to set
    */
   public void setIfUnset(Configuration conf, long value) {
-    if (conf.get(getKey()) == null) {
+    if (!contains(conf)) {
       conf.setLong(getKey(), value);
     }
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
index 83a583d..7c3a993 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
@@ -19,6 +19,7 @@ package org.apache.giraph.conf;
 
 import org.apache.hadoop.conf.Configuration;
 
+import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 
 import java.util.List;
@@ -32,6 +33,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Constructor
+   *
    * @param key key
    * @param defaultValue default value
    */
@@ -45,6 +47,10 @@ public class StrConfOption extends AbstractConfOption {
     return defaultValue;
   }
 
+  @Override public boolean isDefaultValue(Configuration conf) {
+    return Objects.equal(get(conf), defaultValue);
+  }
+
   @Override public String getDefaultValueStr() {
     return defaultValue;
   }
@@ -55,6 +61,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Lookup value
+   *
    * @param conf Configuration
    * @return value for key, or defaultValue
    */
@@ -64,6 +71,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Lookup value with user defined defaultValue
+   *
    * @param conf Configuration
    * @param defaultVal default value to use
    * @return value for key, or defaultVal passed in
@@ -74,6 +82,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Get array of values for key
+   *
    * @param conf Configuration
    * @return array of values for key
    */
@@ -83,6 +92,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Get list of values for key
+   *
    * @param conf Configuration
    * @return list of values for key
    */
@@ -92,6 +102,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Set value for key
+   *
    * @param conf Configuration
    * @param value to set
    */
@@ -101,6 +112,7 @@ public class StrConfOption extends AbstractConfOption {
 
   /**
    * Set value if not already present
+   *
    * @param conf Configuration
    * @param value to set
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java b/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
new file mode 100644
index 0000000..0deb79f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for classes that are parameterized by all of the Giraph types.
+ * These classes can be used to infer the types used by {@link GiraphTypes}
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M1> Incoming Message Value
+ * @param <M2> Outgoing Message Value
+ */
+public interface TypesHolder<I extends WritableComparable,
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable> {
+  // Nothing here
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 84158df..87d5879 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.comm.WorkerClientRequestProcessor;
 import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.TypesHolder;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.OutEdges;
 import org.apache.giraph.worker.WorkerAggregatorUsage;
@@ -27,6 +28,7 @@ import org.apache.giraph.worker.WorkerContext;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 
@@ -53,7 +55,10 @@ public abstract class Computation<I extends WritableComparable,
     V extends Writable, E extends Writable, M1 extends Writable,
     M2 extends Writable>
     extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
-    implements WorkerAggregatorUsage {
+    implements TypesHolder<I, V, E, M1, M2>, WorkerAggregatorUsage {
+  /** Logger */
+  private static final Logger LOG = Logger.getLogger(Computation.class);
+
   /** Global graph state **/
   private GraphState graphState;
   /** Handles requests */
@@ -101,7 +106,7 @@ public abstract class Computation<I extends WritableComparable,
    * @param workerAggregatorUsage Worker aggregator usage
    * @param workerContext Worker context
    */
-  public final void initialize(
+  public void initialize(
       GraphState graphState,
       WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
       GraphTaskManager<I, V, E> graphTaskManager,

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
new file mode 100644
index 0000000..d23db05
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory for creating Computations
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M1> Incoming Message Value
+ * @param <M2> Outgoing Message Value
+ */
+public interface ComputationFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable, M1 extends Writable,
+    M2 extends Writable> {
+  /**
+   * One time initialization before compute calls.
+   * Guaranteed to be called from only one thread before computation begins.
+   *
+   * @param conf Configuration
+   */
+  void initComputation(ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+  /**
+   * Get Computation object
+   *
+   * @param conf Configuration
+   * @return Computation
+   */
+  Computation<I, V, E, M1, M2> getComputation(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+  /**
+   * Check that the Configuration passed in is setup correctly to run a job.
+   *
+   * @param conf Configuration to check.
+   */
+  void checkConfiguration(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+  /**
+   * Get name of this particular computation
+   *
+   * @param conf Configuration
+   * @return String name of computation
+   */
+  String computationName(GiraphConfiguration conf);
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
new file mode 100644
index 0000000..405b272
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default computation factory that simply creates java computation object
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class DefaultComputationFactory<I extends WritableComparable,
+    V extends Writable, E extends Writable>
+    implements ComputationFactory<I, V, E, Writable, Writable> {
+  @Override
+  public void initComputation(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    // Nothing to do here
+  }
+
+  @Override
+  public Computation<I, V, E, Writable, Writable> getComputation(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    Class<? extends Computation> klass = conf.getComputationClass();
+    return ReflectionUtils.newInstance(klass, conf);
+  }
+
+  @Override
+  public void checkConfiguration(
+      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+    if (conf.getComputationClass() == null) {
+      throw new IllegalArgumentException("checkConfiguration: Null " +
+          GiraphConstants.COMPUTATION_CLASS.getKey());
+    }
+  }
+
+  @Override
+  public String computationName(GiraphConfiguration conf) {
+    return conf.getComputationClass().getSimpleName();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 99b28df..435dd87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -40,7 +40,6 @@ import org.apache.giraph.time.Time;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.MemoryUtils;
 import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.giraph.worker.BspServiceWorker;
 import org.apache.giraph.worker.InputSplitsCallable;
 import org.apache.giraph.worker.WorkerContext;
@@ -58,7 +57,6 @@ import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 
 import java.io.IOException;
-import java.lang.reflect.Type;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.util.ArrayList;
@@ -70,12 +68,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
-
 /**
  * The Giraph-specific business logic for a single BSP
  * compute node in whatever underlying type of cluster
@@ -175,13 +167,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
   public void setup(Path[] zkPathList)
     throws IOException, InterruptedException {
     context.setStatus("setup: Beginning worker setup.");
-    conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
-      context.getConfiguration());
-    determineClassTypes(conf);
+    Configuration hadoopConf = context.getConfiguration();
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
+    // Write user's graph types (I,V,E,M) back to configuration parameters so
+    // that they are set for quicker access later. These types are often
+    // inferred from the Computation class used.
+    conf.getGiraphTypes().writeIfUnset(conf);
     // configure global logging level for Giraph job
     initializeAndConfigureLogging();
     // init the metrics objects
     setupAndInitializeGiraphMetrics();
+    // One time setup for computation factory
+    conf.createComputationFactory().initComputation(conf);
     // Do some task setup (possibly starting up a Zookeeper service)
     context.setStatus("setup: Initializing Zookeeper services.");
     locateZookeeperClasspath(zkPathList);
@@ -437,32 +434,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
     return serviceWorker.getWorkerContext();
   }
 
- /**
-   * Set the concrete, user-defined choices about generic methods
-   * (validated earlier in GiraphRunner) into the Configuration.
-   * @param conf the Configuration object for this job run.
-   */
-  public void determineClassTypes(Configuration conf) {
-    ImmutableClassesGiraphConfiguration giraphConf =
-        new ImmutableClassesGiraphConfiguration(conf);
-    Class<? extends Computation<I, V, E, Writable, Writable>> computationClass =
-        giraphConf.getComputationClass();
-    List<Class<?>> classList = ReflectionUtils.<Computation>getTypeArguments(
-        Computation.class, computationClass);
-    Type vertexIndexType = classList.get(0);
-    Type vertexValueType = classList.get(1);
-    Type edgeValueType = classList.get(2);
-    Type incomingMessageValueType = classList.get(3);
-    Type outgoingMessageValueType = classList.get(4);
-    VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
-    VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
-    EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
-    INCOMING_MESSAGE_VALUE_CLASS.set(conf,
-        (Class<Writable>) incomingMessageValueType);
-    OUTGOING_MESSAGE_VALUE_CLASS.set(conf,
-        (Class<Writable>) outgoingMessageValueType);
-  }
-
   /**
    * Copied from JobConf to get the location of this jar.  Workaround for
    * things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Language.java b/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
new file mode 100644
index 0000000..29e0a84
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+/**
+ * Programming language that something is implemented in
+ */
+public enum Language {
+  /** java */
+  JAVA,
+  /** jython */
+  JYTHON
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
index 91725f8..b15f222 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
@@ -18,7 +18,6 @@
 
 package org.apache.giraph.io.formats;
 
-import com.google.common.collect.Lists;
 import org.apache.giraph.edge.Edge;
 import org.apache.giraph.edge.EdgeFactory;
 import org.apache.hadoop.io.IntWritable;
@@ -27,6 +26,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import com.google.common.collect.Lists;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -46,13 +47,13 @@ public class IntIntNullTextInputFormat extends
   public TextVertexReader createVertexReader(InputSplit split,
       TaskAttemptContext context)
     throws IOException {
-    return new IntIntNullIntVertexReader();
+    return new IntIntNullVertexReader();
   }
 
   /**
    * Vertex reader associated with {@link IntIntNullTextInputFormat}.
    */
-  public class IntIntNullIntVertexReader extends
+  public class IntIntNullVertexReader extends
     TextVertexReaderFromEachLineProcessed<String[]> {
     /**
      * Cached vertex id for the current line

http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
new file mode 100644
index 0000000..4d47862
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with long ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class LongLongNullTextInputFormat extends
+    TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
+  /** Separator of the vertex and neighbors */
+  private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+  @Override
+  public TextVertexReader createVertexReader(InputSplit split,
+                                             TaskAttemptContext context)
+    throws IOException {
+    return new LongLongNullVertexReader();
+  }
+
+  /**
+   * Vertex reader associated with {@link LongLongNullLongTextInputFormat}.
+   */
+  public class LongLongNullVertexReader extends
+      TextVertexReaderFromEachLineProcessed<String[]> {
+    /** Cached vertex id for the current line */
+    private LongWritable id;
+
+    @Override
+    protected String[] preprocessLine(Text line) throws IOException {
+      String[] tokens = SEPARATOR.split(line.toString());
+      id = new LongWritable(Long.parseLong(tokens[0]));
+      return tokens;
+    }
+
+    @Override
+    protected LongWritable getId(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected LongWritable getValue(String[] tokens) throws IOException {
+      return id;
+    }
+
+    @Override
+    protected Iterable<Edge<LongWritable, NullWritable>> getEdges(
+        String[] tokens) throws IOException {
+      List<Edge<LongWritable, NullWritable>> edges =
+          Lists.newArrayListWithCapacity(tokens.length - 1);
+      for (int n = 1; n < tokens.length; n++) {
+        edges.add(EdgeFactory.create(
+            new LongWritable(Long.parseLong(tokens[n]))));
+      }
+      return edges;
+    }
+  }
+}