You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/06/20 20:24:12 UTC
[1/2] GIRAPH-683: Jython for Computation (nitay)
Updated Branches:
refs/heads/trunk 1eaddd183 -> 8f89bd85a
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
index de17157..486c6db 100644
--- a/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
+++ b/giraph-core/src/main/java/org/apache/giraph/job/GiraphConfigurationValidator.java
@@ -19,7 +19,6 @@
package org.apache.giraph.job;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.OutEdges;
@@ -30,17 +29,14 @@ import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import java.lang.reflect.Type;
-import java.util.List;
-
import static org.apache.giraph.conf.GiraphConstants.VERTEX_EDGES_CLASS;
import static org.apache.giraph.conf.GiraphConstants.VERTEX_RESOLVER_CLASS;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
/**
* GiraphConfigurationValidator attempts to verify the consistency of
@@ -69,26 +65,15 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
private static final int VALUE_PARAM_INDEX = 1;
/** E param vertex index in classList */
private static final int EDGE_PARAM_INDEX = 2;
- /** M2 param vertex index in classList */
- private static final int OUTGOING_MSG_PARAM_INDEX = 4;
/** M param vertex combiner index in classList */
private static final int MSG_COMBINER_PARAM_INDEX = 1;
/** E param edge input format index in classList */
private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1;
/** E param vertex edges index in classList */
- private static final int EDGE_PARAM_VERTEX_EDGES_INDEX = 1;
+ private static final int EDGE_PARAM_OUT_EDGES_INDEX = 1;
/** V param vertex value factory index in classList */
private static final int VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX = 0;
- /** Vertex Index Type */
- private Type vertexIndexType;
- /** Vertex Value Type */
- private Type vertexValueType;
- /** Edge Value Type */
- private Type edgeValueType;
- /** Outgoing Message Type */
- private Type outgoingMessageValueType;
-
/**
* The Configuration object for use in the validation test.
*/
@@ -105,6 +90,42 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
}
/**
+ * Get vertex id type
+ *
+ * @return vertex id type
+ */
+ private Class<? extends WritableComparable> vertexIndexType() {
+ return conf.getGiraphTypes().getVertexIdClass();
+ }
+
+ /**
+ * Get vertex value type
+ *
+ * @return vertex value type
+ */
+ private Class<? extends Writable> vertexValueType() {
+ return conf.getGiraphTypes().getVertexValueClass();
+ }
+
+ /**
+ * Get edge value type
+ *
+ * @return edge value type
+ */
+ private Class<? extends Writable> edgeValueType() {
+ return conf.getGiraphTypes().getEdgeValueClass();
+ }
+
+ /**
+ * Get outgoing message value type
+ *
+ * @return outgoing message value type
+ */
+ private Class<? extends Writable> outgoingMessageValueType() {
+ return conf.getGiraphTypes().getOutgoingMessageValueClass();
+ }
+
+ /**
* Make sure that all registered classes have matching types. This
* is a little tricky due to type erasure, cannot simply get them from
* the class type arguments. Also, set the vertex index, vertex value,
@@ -112,14 +133,6 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
*/
public void validateConfiguration() {
checkConfiguration();
- Class<? extends Computation<I, V, E, M1, M2>> computationClass =
- conf.getComputationClass();
- List<Class<?>> classList = ReflectionUtils.getTypeArguments(
- Computation.class, computationClass);
- vertexIndexType = classList.get(ID_PARAM_INDEX);
- vertexValueType = classList.get(VALUE_PARAM_INDEX);
- edgeValueType = classList.get(EDGE_PARAM_INDEX);
- outgoingMessageValueType = classList.get(OUTGOING_MSG_PARAM_INDEX);
verifyOutEdgesGenericTypes();
verifyVertexInputFormatGenericTypes();
verifyEdgeInputFormatGenericTypes();
@@ -148,10 +161,7 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
throw new IllegalArgumentException("checkConfiguration: No valid " +
GiraphConstants.MIN_WORKERS);
}
- if (conf.getComputationClass() == null) {
- throw new IllegalArgumentException("checkConfiguration: Null " +
- GiraphConstants.COMPUTATION_CLASS.getKey());
- }
+ conf.createComputationFactory().checkConfiguration(conf);
if (conf.getVertexInputFormatClass() == null &&
conf.getEdgeInputFormatClass() == null) {
throw new IllegalArgumentException("checkConfiguration: One of " +
@@ -183,28 +193,11 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
*/
private void verifyOutEdgesGenericTypesClass(
Class<? extends OutEdges<I, E>> outEdgesClass) {
- List<Class<?>> classList = ReflectionUtils.getTypeArguments(
- OutEdges.class, outEdgesClass);
- // OutEdges implementations can be generic, in which case there are no
- // types to check.
- if (classList.isEmpty()) {
- return;
- }
- if (classList.get(ID_PARAM_INDEX) != null &&
- !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex edges - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX) != null &&
- !edgeValueType.equals(classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex edges - " +
- classList.get(EDGE_PARAM_VERTEX_EDGES_INDEX));
- }
+ Class<?>[] classList = getTypeArguments(OutEdges.class, outEdgesClass);
+ checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+ OutEdges.class, "vertex index");
+ checkAssignable(classList, EDGE_PARAM_OUT_EDGES_INDEX, edgeValueType(),
+ OutEdges.class, "edge value");
}
/** Verify matching generic types in OutEdges. */
@@ -222,33 +215,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
Class<? extends VertexInputFormat<I, V, E>> vertexInputFormatClass =
conf.getVertexInputFormatClass();
if (vertexInputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- VertexInputFormat.class, vertexInputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex input format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex input format - " + classList.get(VALUE_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_INDEX) == null) {
- LOG.warn("Input format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex input format - " + classList.get(EDGE_PARAM_INDEX));
- }
+ Class<?>[] classList =
+ getTypeArguments(VertexInputFormat.class, vertexInputFormatClass);
+ checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+ VertexInputFormat.class, "vertex index");
+ checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
+ VertexInputFormat.class, "vertex value");
+ checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
+ VertexInputFormat.class, "edge value");
}
}
@@ -257,27 +231,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
conf.getEdgeInputFormatClass();
if (edgeInputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- EdgeInputFormat.class, edgeInputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Input format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", edge input format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) {
- LOG.warn("Input format edge value type is not known");
- } else if (!edgeValueType.equals(
- classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", edge input format - " +
- classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX));
- }
+ Class<?>[] classList =
+ getTypeArguments(EdgeInputFormat.class, edgeInputFormatClass);
+ checkAssignable(classList, ID_PARAM_INDEX,
+ vertexIndexType(), EdgeInputFormat.class, "vertex index");
+ checkAssignable(classList, EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX,
+ edgeValueType(), EdgeInputFormat.class, "edge value");
}
}
@@ -286,22 +245,12 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
Class<? extends Combiner<I, M2>> vertexCombinerClass =
conf.getCombinerClass();
if (vertexCombinerClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- Combiner.class, vertexCombinerClass);
- if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex combiner - " + classList.get(ID_PARAM_INDEX));
- }
- if (!outgoingMessageValueType.equals(
- classList.get(MSG_COMBINER_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Message value types don't match, " +
- "vertex - " + outgoingMessageValueType +
- ", vertex combiner - " + classList.get(MSG_COMBINER_PARAM_INDEX));
- }
+ Class<?>[] classList =
+ getTypeArguments(Combiner.class, vertexCombinerClass);
+ checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+ Combiner.class, "vertex index");
+ checkEquals(classList, MSG_COMBINER_PARAM_INDEX,
+ outgoingMessageValueType(), Combiner.class, "message value");
}
}
@@ -310,33 +259,14 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
Class<? extends VertexOutputFormat<I, V, E>>
vertexOutputFormatClass = conf.getVertexOutputFormatClass();
if (vertexOutputFormatClass != null) {
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- VertexOutputFormat.class, vertexOutputFormatClass);
- if (classList.get(ID_PARAM_INDEX) == null) {
- LOG.warn("Output format vertex index type is not known");
- } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex output format - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) == null) {
- LOG.warn("Output format vertex value type is not known");
- } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex output format - " + classList.get(VALUE_PARAM_INDEX));
- }
- if (classList.get(EDGE_PARAM_INDEX) == null) {
- LOG.warn("Output format edge value type is not known");
- } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex output format - " + classList.get(EDGE_PARAM_INDEX));
- }
+ Class<?>[] classList =
+ getTypeArguments(VertexOutputFormat.class, vertexOutputFormatClass);
+ checkAssignable(classList, ID_PARAM_INDEX, vertexIndexType(),
+ VertexOutputFormat.class, "vertex index");
+ checkAssignable(classList, VALUE_PARAM_INDEX, vertexValueType(),
+ VertexOutputFormat.class, "vertex value");
+ checkAssignable(classList, EDGE_PARAM_INDEX, edgeValueType(),
+ VertexOutputFormat.class, "edge value");
}
}
@@ -347,53 +277,72 @@ public class GiraphConfigurationValidator<I extends WritableComparable,
if (DefaultVertexValueFactory.class.equals(vvfClass)) {
return;
}
- List<Class<?>> classList = ReflectionUtils.getTypeArguments(
- VertexValueFactory.class, vvfClass);
- if (classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX) != null &&
- !vertexValueType.equals(
- classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex value factory - " +
- classList.get(VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX));
- }
+ Class<?>[] classList = getTypeArguments(VertexValueFactory.class, vvfClass);
+ checkEquals(classList, VALUE_PARAM_VERTEX_VALUE_FACTORY_INDEX,
+ vertexValueType(), VertexValueFactory.class, "vertex value");
}
- /** If there is a vertex resolver,
- * validate the generic parameter types. */
+ /**
+ * If there is a vertex resolver,
+ * validate the generic parameter types.
+ * */
private void verifyVertexResolverGenericTypes() {
Class<? extends VertexResolver<I, V, E>>
- vrClass = conf.getVertexResolverClass();
- if (!DefaultVertexResolver.class.isAssignableFrom(vrClass)) {
+ vrClass = conf.getVertexResolverClass();
+ if (DefaultVertexResolver.class.equals(vrClass)) {
return;
}
- Class<? extends DefaultVertexResolver<I, V, E>>
- dvrClass =
- (Class<? extends DefaultVertexResolver<I, V, E>>) vrClass;
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(
- DefaultVertexResolver.class, dvrClass);
- if (classList.get(ID_PARAM_INDEX) != null &&
- !vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) {
- throw new IllegalArgumentException(
- "checkClassTypes: Vertex index types don't match, " +
- "vertex - " + vertexIndexType +
- ", vertex resolver - " + classList.get(ID_PARAM_INDEX));
- }
- if (classList.get(VALUE_PARAM_INDEX) != null &&
- !vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) {
+ Class<?>[] classList =
+ getTypeArguments(VertexResolver.class, vrClass);
+ checkEquals(classList, ID_PARAM_INDEX, vertexIndexType(),
+ VertexResolver.class, "vertex index");
+ checkEquals(classList, VALUE_PARAM_INDEX, vertexValueType(),
+ VertexResolver.class, "vertex value");
+ checkEquals(classList, EDGE_PARAM_INDEX, edgeValueType(),
+ VertexResolver.class, "edge value");
+ }
+
+ /**
+ * Check that the type from computation equals the type from the class.
+ *
+ * @param classList classes from type
+ * @param index array index of class to check
+ * @param classFromComputation class from computation
+ * @param klass Class type we're checking, only used for printing name
+ * @param typeName Name of type we're checking
+ */
+ private static void checkEquals(Class<?>[] classList, int index,
+ Class<?> classFromComputation, Class klass, String typeName) {
+ if (classList[index] == null) {
+ LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
+ } else if (!classList[index].equals(classFromComputation)) {
throw new IllegalArgumentException(
- "checkClassTypes: Vertex value types don't match, " +
- "vertex - " + vertexValueType +
- ", vertex resolver - " + classList.get(VALUE_PARAM_INDEX));
+ "checkClassTypes: " + typeName + " types not equal, " +
+ "computation - " + classFromComputation +
+ ", " + klass.getSimpleName() + " - " +
+ classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
}
- if (classList.get(EDGE_PARAM_INDEX) != null &&
- !edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) {
+ }
+
+ /**
+ * Check that the type from computation is assignable to type from the class.
+ *
+ * @param classList classes from type
+ * @param index array index of class to check
+ * @param classFromComputation class from computation
+ * @param klass Class type we're checking, only used for printing name
+ * @param typeName Name of type we're checking
+ */
+ private static void checkAssignable(Class<?>[] classList, int index,
+ Class<?> classFromComputation, Class klass, String typeName) {
+ if (classList[index] == null) {
+ LOG.warn(klass.getSimpleName() + " " + typeName + " type is not known");
+ } else if (!classList[index].isAssignableFrom(classFromComputation)) {
throw new IllegalArgumentException(
- "checkClassTypes: Edge value types don't match, " +
- "vertex - " + edgeValueType +
- ", vertex resolver - " + classList.get(EDGE_PARAM_INDEX));
+ "checkClassTypes: " + typeName + " types not assignable, " +
+ "computation - " + classFromComputation +
+ ", " + klass.getSimpleName() + " - " +
+ classList[EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX]);
}
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
new file mode 100644
index 0000000..d916119
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+/**
+ * Type of deployment for a file
+ */
+public enum DeployType {
+ /** Resource packaged with jar */
+ RESOURCE,
+ /** Hadoop's Distributed Cache */
+ DISTRIBUTED_CACHE
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
new file mode 100644
index 0000000..6db5f65
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.EnumConfOption;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.python.core.PyObject;
+import org.python.util.PythonInterpreter;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.io.Closeables;
+
+import java.io.BufferedInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
+
+/**
+ * Factory for creating Jython Computation from python scripts
+ */
+public class JythonComputationFactory implements ComputationFactory {
+ /** Type of script path */
+ public static final EnumConfOption<DeployType> JYTHON_DEPLOY_TYPE =
+ EnumConfOption.create("giraph.jython.deploy.type",
+ DeployType.class, DeployType.DISTRIBUTED_CACHE);
+ /** Path to Jython script */
+ public static final StrConfOption JYTHON_SCRIPT_PATH =
+ new StrConfOption("giraph.jython.path", "_script_not_set_");
+ /** Name of Computation class in Jython script */
+ public static final StrConfOption JYTHON_COMPUTATION_CLASS =
+ new StrConfOption("giraph.jython.class", "_computation_class_not_set_");
+
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(JythonUtils.class);
+
+ @Override
+ public void initComputation(ImmutableClassesGiraphConfiguration conf) {
+ String scriptPath = JYTHON_SCRIPT_PATH.get(conf);
+ InputStream pythonStream = getPythonScriptStream(conf, scriptPath);
+ try {
+ PythonInterpreter interpreter = new PythonInterpreter();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("initComputation: Jython loading script " + scriptPath);
+ }
+ interpreter.execfile(pythonStream);
+
+ String className = computationName(conf);
+ PyObject pyComputationModule = interpreter.get(className);
+
+ JythonUtils.setPythonComputationModule(pyComputationModule);
+ } finally {
+ Closeables.closeQuietly(pythonStream);
+ }
+ }
+
+ /**
+ * Get an {@link InputStream} for the jython script.
+ *
+ * @param conf Configuration
+ * @param path script path
+ * @return {@link InputStream} for reading script
+ */
+ private InputStream getPythonScriptStream(Configuration conf,
+ String path) {
+ InputStream stream = null;
+ DeployType deployType = JYTHON_DEPLOY_TYPE.get(conf);
+ switch (deployType) {
+ case RESOURCE:
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getPythonScriptStream: Reading Jython Computation " +
+ "from resource at " + path);
+ }
+ stream = getClass().getResourceAsStream(path);
+ if (stream == null) {
+ throw new IllegalStateException("getPythonScriptStream: Failed to " +
+ "open Jython script from resource at " + path);
+ }
+ break;
+ case DISTRIBUTED_CACHE:
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getPythonScriptStream: Reading Jython Computation " +
+ "from DistributedCache at " + path);
+ }
+ Optional<Path> localPath = getLocalCacheFile(conf, path);
+ if (!localPath.isPresent()) {
+ throw new IllegalStateException("getPythonScriptStream: Failed to " +
+ "find Jython script in local DistributedCache matching " + path);
+ }
+ String pathStr = localPath.get().toString();
+ try {
+ stream = new BufferedInputStream(new FileInputStream(pathStr));
+ } catch (IOException e) {
+ throw new IllegalStateException("getPythonScriptStream: Failed open " +
+ "Jython script from DistributedCache at " + localPath);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("getPythonScriptStream: Unknown " +
+ "Jython script deployment type: " + deployType);
+ }
+ return stream;
+ }
+
+ @Override
+ public Computation getComputation(ImmutableClassesGiraphConfiguration conf) {
+ PyObject pyComputationModule = JythonUtils.getPythonComputationModule();
+ Preconditions.checkNotNull(pyComputationModule);
+
+ PyObject pyComputationObj = pyComputationModule.__call__();
+ Object computationObj = pyComputationObj.__tojava__(Computation.class);
+ if (!(computationObj instanceof Computation)) {
+ throw new IllegalStateException("getComputation: Jython object " +
+ computationName(conf) + " is not a Computation");
+ }
+
+ conf.configureIfPossible(computationObj);
+ return (Computation) computationObj;
+ }
+
+ @Override
+ public void checkConfiguration(ImmutableClassesGiraphConfiguration conf) {
+ if (JYTHON_SCRIPT_PATH.isDefaultValue(conf)) {
+ throw new IllegalStateException("checkConfiguration: " +
+ JYTHON_SCRIPT_PATH.getKey() + " not set in configuration");
+ }
+ if (JYTHON_COMPUTATION_CLASS.isDefaultValue(conf)) {
+ throw new IllegalStateException("checkConfiguration: " +
+ JYTHON_COMPUTATION_CLASS.getKey() + " not set in configuration");
+ }
+ }
+
+ @Override
+ public String computationName(GiraphConfiguration conf) {
+ return JYTHON_COMPUTATION_CLASS.get(conf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
new file mode 100644
index 0000000..f456aa8
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.graph.Language;
+import org.python.core.PyObject;
+
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_FACTORY_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_SCRIPT_PATH;
+
+/**
+ * Helpers for running jobs with Jython.
+ */
+public class JythonUtils {
+ /** The Jython compute function, cached here for fast access */
+ private static volatile PyObject JYTHON_COMPUTATION_MODULE;
+
+ /** Don't construct */
+ private JythonUtils() { }
+
+ /**
+ * Set static python computation module stored
+ *
+ * @param mod python computation module
+ */
+ public static void setPythonComputationModule(PyObject mod) {
+ JYTHON_COMPUTATION_MODULE = mod;
+ }
+
+ /**
+ * Get python computation module stored
+ *
+ * @return python computation module
+ */
+ public static PyObject getPythonComputationModule() {
+ return JYTHON_COMPUTATION_MODULE;
+ }
+
+ /**
+ * Sets up the Configuration for using Jython
+ *
+ * @param conf Configuration to se
+ * @param scriptPath Path to Jython script (resource or distributed cache)
+ * @param klassName Class name of Jython Computation
+ * @param types GiraphTypes
+ */
+ public static void init(GiraphConfiguration conf, String scriptPath,
+ String klassName, GiraphTypes types) {
+ types.writeIfUnset(conf);
+ COMPUTATION_LANGUAGE.set(conf, Language.JYTHON);
+ COMPUTATION_FACTORY_CLASS.set(conf, JythonComputationFactory.class);
+ JYTHON_SCRIPT_PATH.set(conf, scriptPath);
+ JYTHON_COMPUTATION_CLASS.set(conf, klassName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java b/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
new file mode 100644
index 0000000..4188ddb
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/jython/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Jython integration.
+ */
+package org.apache.giraph.jython;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
index a12ef58..13bb492 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/SuperstepClasses.java
@@ -20,7 +20,9 @@ package org.apache.giraph.master;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.TypesHolder;
import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.Language;
import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -29,7 +31,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Modifier;
-import java.util.List;
+
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE;
/**
* Holds Computation and Combiner class.
@@ -92,17 +95,25 @@ public class SuperstepClasses implements Writable {
* @param conf Configuration to verify this with
*/
public void verifyTypesMatch(ImmutableClassesGiraphConfiguration conf) {
- List<Class<?>> computationTypes = ReflectionUtils.getTypeArguments(
- Computation.class, computationClass);
- verifyTypes(conf.getVertexIdClass(), computationTypes.get(0),
+ // In some cases, for example when using Jython, the Computation class may
+ // not be set. This is because it is created by a ComputationFactory
+ // dynamically and not known ahead of time. In this case there is nothing to
+ // verify here so we bail.
+ if (COMPUTATION_LANGUAGE.get(conf) == Language.JYTHON) {
+ return;
+ }
+
+ Class<?>[] computationTypes = ReflectionUtils.getTypeArguments(
+ TypesHolder.class, computationClass);
+ verifyTypes(conf.getVertexIdClass(), computationTypes[0],
"Vertex id", computationClass);
- verifyTypes(conf.getVertexValueClass(), computationTypes.get(1),
+ verifyTypes(conf.getVertexValueClass(), computationTypes[1],
"Vertex value", computationClass);
- verifyTypes(conf.getEdgeValueClass(), computationTypes.get(2),
+ verifyTypes(conf.getEdgeValueClass(), computationTypes[2],
"Edge value", computationClass);
- verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes.get(3),
+ verifyTypes(conf.getOutgoingMessageValueClass(), computationTypes[3],
"Previous outgoing and new incoming message", computationClass);
- Class<?> outgoingMessageType = computationTypes.get(4);
+ Class<?> outgoingMessageType = computationTypes[4];
if (outgoingMessageType.isInterface()) {
throw new IllegalStateException("verifyTypesMatch: " +
"Message type must be concrete class " + outgoingMessageType);
@@ -112,11 +123,11 @@ public class SuperstepClasses implements Writable {
"Message type can't be abstract class" + outgoingMessageType);
}
if (combinerClass != null) {
- List<Class<?>> combinerTypes = ReflectionUtils.getTypeArguments(
+ Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments(
Combiner.class, combinerClass);
- verifyTypes(conf.getVertexIdClass(), combinerTypes.get(0),
+ verifyTypes(conf.getVertexIdClass(), combinerTypes[0],
"Vertex id", combinerClass);
- verifyTypes(outgoingMessageType, combinerTypes.get(1),
+ verifyTypes(outgoingMessageType, combinerTypes[1],
"Outgoing message", combinerClass);
}
}
@@ -154,7 +165,9 @@ public class SuperstepClasses implements Writable {
@Override
public String toString() {
- return "(computation=" + computationClass.getName() + ",combiner=" +
+ String computationName = computationClass == null ? "_not_set_" :
+ computationClass.getName();
+ return "(computation=" + computationName + ",combiner=" +
((combinerClass == null) ? "null" : combinerClass.getName()) + ")";
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
index d8b121b..f0a7e4f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java
@@ -17,8 +17,6 @@
*/
package org.apache.giraph.utils;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
@@ -28,30 +26,39 @@ import org.apache.commons.cli.ParseException;
import org.apache.giraph.Algorithm;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.conf.TypesHolder;
import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.job.GiraphConfigurationValidator;
+import org.apache.giraph.jython.JythonUtils;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.partition.Partition;
-import org.apache.giraph.graph.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.ZooKeeper;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+
import java.io.IOException;
import java.util.List;
+import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.TYPES_HOLDER_CLASS;
+
/**
* Translate command line args into Configuration Key-Value pairs.
*/
@@ -100,6 +107,10 @@ public final class ConfigurationUtils {
OPTIONS.addOption("pc", "partitionClass", true, "Partition class");
OPTIONS.addOption("vvf", "vertexValueFactoryClass", true,
"Vertex value factory class");
+ OPTIONS.addOption("th", "typesHolder", true,
+ "Class that holds types. Needed only if Computation is not set");
+ OPTIONS.addOption("jyc", "jythonClass", true,
+ "Jython class name, used if computation passed in is a python script");
OPTIONS.addOption("ca", "customArguments", true, "provide custom" +
" arguments for the job configuration in the form:" +
" -ca <param1>=<value1>,<param2>=<value2> -ca <param3>=<value3> etc." +
@@ -113,6 +124,29 @@ public final class ConfigurationUtils {
private ConfigurationUtils() { }
/**
+ * Get a class which is parameterized by the graph types defined by user.
+ * The types holder is actually an interface that any class which holds all of
+ * Giraph types can implement. It is used with reflection to infer the Giraph
+ * types.
+ *
+ * The current order of type holders we try are:
+ * 1) The {@link TypesHolder} class directly.
+ * 2) The {@link Computation} class, as that holds all the types.
+ *
+ * @param conf Configuration
+ * @return {@link TypesHolder} or null if could not find one.
+ */
+ public static Class<? extends TypesHolder> getTypesHolderClass(
+ Configuration conf) {
+ Class<? extends TypesHolder> klass = TYPES_HOLDER_CLASS.get(conf);
+ if (klass != null) {
+ return klass;
+ }
+ klass = COMPUTATION_CLASS.get(conf);
+ return klass;
+ }
+
+ /**
* Translate CLI arguments to GiraphRunner or 'bin/hadoop jar' into
* Configuration Key-Value pairs.
* @param giraphConf the current job Configuration.
@@ -199,52 +233,55 @@ public final class ConfigurationUtils {
* Populate GiraphConfiguration for this job with all cmd line args found.
* Any global configuration data that Giraph on any platform might need
* should be captured here.
- * @param giraphConfiguration config for this job run
- * @param cmd parsed command line options to store in giraphConfiguration
+ *
+ * @param conf config for this job run
+ * @param cmd parsed command line options to store in conf
* @param computationClassName the computation class (application) to run in
* this job.
* @param workers the number of worker tasks for this job run.
*/
private static void populateGiraphConfiguration(final GiraphConfiguration
- giraphConfiguration, final CommandLine cmd,
- final String computationClassName,
- final int workers) throws ClassNotFoundException, IOException {
- giraphConfiguration.setWorkerConfiguration(workers, workers, 100.0f);
- giraphConfiguration.setComputationClass(
- (Class<? extends Computation>) Class.forName(computationClassName));
+ conf, final CommandLine cmd,
+ final String computationClassName, final int workers)
+ throws ClassNotFoundException, IOException {
+ conf.setWorkerConfiguration(workers, workers, 100.0f);
+ if (cmd.hasOption("typesHolder")) {
+ Class<? extends TypesHolder> typesHolderClass =
+ (Class<? extends TypesHolder>)
+ Class.forName(cmd.getOptionValue("typesHolder"));
+ TYPES_HOLDER_CLASS.set(conf, typesHolderClass);
+ }
if (cmd.hasOption("c")) {
- giraphConfiguration.setCombinerClass(
+ conf.setCombinerClass(
(Class<? extends Combiner>) Class.forName(cmd.getOptionValue("c")));
}
if (cmd.hasOption("ve")) {
- giraphConfiguration.setOutEdgesClass(
- (Class<? extends OutEdges>)
- Class.forName(cmd.getOptionValue("ve")));
+ conf.setOutEdgesClass(
+ (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ve")));
}
if (cmd.hasOption("ive")) {
- giraphConfiguration.setInputOutEdgesClass(
- (Class<? extends OutEdges>)
- Class.forName(cmd.getOptionValue("ive")));
+ conf.setInputOutEdgesClass(
+ (Class<? extends OutEdges>) Class.forName(cmd.getOptionValue("ive")));
}
if (cmd.hasOption("wc")) {
- giraphConfiguration.setWorkerContextClass(
- (Class<? extends WorkerContext>)
- Class.forName(cmd.getOptionValue("wc")));
+ conf.setWorkerContextClass(
+ (Class<? extends WorkerContext>) Class
+ .forName(cmd.getOptionValue("wc")));
}
if (cmd.hasOption("mc")) {
- giraphConfiguration.setMasterComputeClass(
- (Class<? extends MasterCompute>)
- Class.forName(cmd.getOptionValue("mc")));
+ conf.setMasterComputeClass(
+ (Class<? extends MasterCompute>) Class
+ .forName(cmd.getOptionValue("mc")));
}
if (cmd.hasOption("aw")) {
- giraphConfiguration.setAggregatorWriterClass(
- (Class<? extends AggregatorWriter>)
- Class.forName(cmd.getOptionValue("aw")));
+ conf.setAggregatorWriterClass(
+ (Class<? extends AggregatorWriter>) Class
+ .forName(cmd.getOptionValue("aw")));
}
if (cmd.hasOption("vif")) {
- giraphConfiguration.setVertexInputFormatClass(
- (Class<? extends VertexInputFormat>)
- Class.forName(cmd.getOptionValue("vif")));
+ conf.setVertexInputFormatClass(
+ (Class<? extends VertexInputFormat>) Class
+ .forName(cmd.getOptionValue("vif")));
} else {
if (LOG.isInfoEnabled()) {
LOG.info("No vertex input format specified. Ensure your " +
@@ -252,9 +289,9 @@ public final class ConfigurationUtils {
}
}
if (cmd.hasOption("eif")) {
- giraphConfiguration.setEdgeInputFormatClass(
- (Class<? extends EdgeInputFormat>)
- Class.forName(cmd.getOptionValue("eif")));
+ conf.setEdgeInputFormatClass(
+ (Class<? extends EdgeInputFormat>) Class
+ .forName(cmd.getOptionValue("eif")));
} else {
if (LOG.isInfoEnabled()) {
LOG.info("No edge input format specified. Ensure your " +
@@ -262,9 +299,9 @@ public final class ConfigurationUtils {
}
}
if (cmd.hasOption("of")) {
- giraphConfiguration.setVertexOutputFormatClass(
- (Class<? extends VertexOutputFormat>)
- Class.forName(cmd.getOptionValue("of")));
+ conf.setVertexOutputFormatClass(
+ (Class<? extends VertexOutputFormat>) Class
+ .forName(cmd.getOptionValue("of")));
} else {
if (LOG.isInfoEnabled()) {
LOG.info("No output format specified. Ensure your OutputFormat " +
@@ -272,14 +309,13 @@ public final class ConfigurationUtils {
}
}
if (cmd.hasOption("pc")) {
- giraphConfiguration.setPartitionClass(
- (Class<? extends Partition>)
- Class.forName(cmd.getOptionValue("pc")));
+ conf.setPartitionClass(
+ (Class<? extends Partition>) Class.forName(cmd.getOptionValue("pc")));
}
if (cmd.hasOption("vvf")) {
- giraphConfiguration.setVertexValueFactoryClass(
- (Class<? extends VertexValueFactory>)
- Class.forName(cmd.getOptionValue("vvf")));
+ conf.setVertexValueFactoryClass(
+ (Class<? extends VertexValueFactory>) Class
+ .forName(cmd.getOptionValue("vvf")));
}
if (cmd.hasOption("ca")) {
for (String caOptionValue : cmd.getOptionValues("ca")) {
@@ -295,14 +331,14 @@ public final class ConfigurationUtils {
LOG.info("Setting custom argument [" + parts[0] + "] to [" +
parts[1] + "] in GiraphConfiguration");
}
- giraphConfiguration.set(parts[0], parts[1]);
+ conf.set(parts[0], parts[1]);
}
}
}
// Now, we parse options that are specific to Hadoop MR Job
if (cmd.hasOption("vif")) {
if (cmd.hasOption("vip")) {
- GiraphFileInputFormat.addVertexInputPath(giraphConfiguration,
+ GiraphFileInputFormat.addVertexInputPath(conf,
new Path(cmd.getOptionValue("vip")));
} else {
if (LOG.isInfoEnabled()) {
@@ -313,7 +349,7 @@ public final class ConfigurationUtils {
}
if (cmd.hasOption("eif")) {
if (cmd.hasOption("eip")) {
- GiraphFileInputFormat.addEdgeInputPath(giraphConfiguration,
+ GiraphFileInputFormat.addEdgeInputPath(conf,
new Path(cmd.getOptionValue("eip")));
} else {
if (LOG.isInfoEnabled()) {
@@ -324,11 +360,11 @@ public final class ConfigurationUtils {
}
// YARN-ONLY OPTIONS
if (cmd.hasOption("yj")) {
- giraphConfiguration.setYarnLibJars(cmd.getOptionValue("yj"));
+ conf.setYarnLibJars(cmd.getOptionValue("yj"));
}
if (cmd.hasOption("yh")) {
- giraphConfiguration.setYarnTaskHeapMb(
- Integer.parseInt(cmd.getOptionValue("yh")));
+ conf.setYarnTaskHeapMb(
+ Integer.parseInt(cmd.getOptionValue("yh")));
}
/*if[PURE_YARN]
if (cmd.hasOption("of")) {
@@ -337,8 +373,8 @@ public final class ConfigurationUtils {
Path outputDir =
new Path(BASE_OUTPUT_PATH, cmd.getOptionValue("op"));
outputDir =
- outputDir.getFileSystem(giraphConfiguration).makeQualified(outputDir);
- giraphConfiguration.set(
+ outputDir.getFileSystem(conf).makeQualified(outputDir);
+ conf.set(
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR,
outputDir.toString());
@@ -351,6 +387,57 @@ public final class ConfigurationUtils {
}
end[PURE_YARN]*/
// END YARN-ONLY OPTIONS
+ handleComputationClass(conf, cmd, computationClassName);
+ }
+
+ /**
+ * Helper to deal with computation class.
+ *
+ * @param conf Configuration
+ * @param cmd CommandLine
+ * @param computationClassName Name of computation
+ * @throws ClassNotFoundException error finding class
+ */
+ private static void handleComputationClass(GiraphConfiguration conf,
+ CommandLine cmd, String computationClassName)
+ throws ClassNotFoundException {
+ if (computationClassName.endsWith("py")) {
+ handleJythonComputation(conf, cmd, computationClassName);
+ } else {
+ conf.setComputationClass(
+ (Class<? extends Computation>) Class.forName(computationClassName));
+ }
+ }
+
+ /**
+ * Helper to handle Computations implemented in Python.
+ *
+ * @param conf Configuration
+ * @param cmd CommandLine
+ * @param scriptPath Path to python script
+ */
+ private static void handleJythonComputation(GiraphConfiguration conf,
+ CommandLine cmd, String scriptPath) {
+ String jythonClass = cmd.getOptionValue("jythonClass");
+ if (jythonClass == null) {
+ throw new IllegalArgumentException(
+ "handleJythonComputation: Need to set Jython Computation class " +
+ "name with --jythonClass");
+ }
+ String typesHolderClass = cmd.getOptionValue("typesHolder");
+ if (typesHolderClass == null) {
+ throw new IllegalArgumentException(
+ "handleJythonComputation: Need to set TypesHolder class name " +
+ "with --typesHolder");
+ }
+
+ Path path = new Path(scriptPath);
+ Path remotePath = DistributedCacheUtils.copyToHdfs(path, conf);
+
+ DistributedCache.addCacheFile(remotePath.toUri(), conf);
+
+ GiraphTypes types = GiraphTypes.readFrom(conf);
+ JythonUtils.init(conf, scriptPath, jythonClass, types);
}
/**
@@ -372,7 +459,7 @@ public final class ConfigurationUtils {
Algorithm.class, "org.apache.giraph");
System.out.print(" Supported algorithms:\n");
for (Class<?> clazz : classes) {
- if (Vertex.class.isAssignableFrom(clazz)) {
+ if (Computation.class.isAssignableFrom(clazz)) {
Algorithm algorithm = clazz.getAnnotation(Algorithm.class);
StringBuilder sb = new StringBuilder();
sb.append(algorithm.name()).append(" - ").append(clazz.getName())
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
new file mode 100644
index 0000000..6abe89b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/DistributedCacheUtils.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Optional;
+
+import java.io.IOException;
+
+import static org.apache.commons.io.FilenameUtils.getBaseName;
+
+/**
+ * Helpers for dealing with {@link org.apache.hadoop.filecache.DistributedCache}
+ */
+public class DistributedCacheUtils {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(
+ DistributedCacheUtils.class);
+
+ /** Don't construct */
+ private DistributedCacheUtils() { }
+
+ /**
+ * Get local path to file from a DistributedCache.
+ *
+ * @param conf Configuration
+ * @param pathToMatch Path that was used to insert into DistributedCache
+ * @return Path matched, or Optional.absent()
+ */
+ public static Optional<Path>
+ getLocalCacheFile(Configuration conf, String pathToMatch) {
+ String nameToPath = FilenameUtils.getName(pathToMatch);
+ Path[] paths;
+ try {
+ paths = DistributedCache.getLocalCacheFiles(conf);
+ } catch (IOException e) {
+ return Optional.absent();
+ }
+ for (Path path : paths) {
+ if (FilenameUtils.getName(path.toString()).equals(nameToPath)) {
+ return Optional.of(path);
+ }
+ }
+ return Optional.absent();
+ }
+
+ /**
+ * Copy a file to HDFS if it is local. If the path is already in HDFS, this
+ * call does nothing.
+ *
+ * @param path path to file
+ * @param conf Configuration
+ * @return path to file on HDFS.
+ */
+ public static Path copyToHdfs(Path path, Configuration conf) {
+ if (path.toString().startsWith("hdfs://")) {
+ // Already on HDFS
+ return path;
+ }
+
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to get HDFS FileSystem", e);
+ }
+ String name = getBaseName(path.toString()) + "-" + System.nanoTime();
+ Path remotePath = new Path("/tmp/giraph", name);
+ LOG.info("copyToHdfsIfNecessary: Copying " + path + " to " +
+ remotePath + " on hdfs " + fs.getUri());
+ try {
+ fs.copyFromLocalFile(false, true, path, remotePath);
+ } catch (IOException e) {
+ throw new IllegalArgumentException(
+ "Failed to copy jython script from local path " + path +
+ " to hdfs path " + remotePath + " on hdfs " + fs.getUri(), e);
+ }
+ return remotePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
index 442fc9f..1bc11a8 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/FileUtils.java
@@ -45,15 +45,15 @@ public class FileUtils {
/**
* Create a temporary folder that will be removed after the test.
*
- * @param vertexClass Used for generating the folder name.
+ * @param computationName Used for generating the folder name.
* @return File object for the directory.
*/
- public static File createTestDir(Class<?> vertexClass)
+ public static File createTestDir(String computationName)
throws IOException {
String systemTmpDir = System.getProperty("java.io.tmpdir");
long simpleRandomLong = (long) (Long.MAX_VALUE * Math.random());
File testTempDir = new File(systemTmpDir, "giraph-" +
- vertexClass.getSimpleName() + '-' + simpleRandomLong);
+ computationName + '-' + simpleRandomLong);
if (!testTempDir.mkdir()) {
throw new IOException("Could not create " + testTempDir);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index b4920e1..9fe7663 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -101,7 +101,7 @@ public class InternalVertexRunner {
File tmpDir = null;
try {
// Prepare input file, output folder and temporary folders
- tmpDir = FileUtils.createTestDir(conf.getComputationClass());
+ tmpDir = FileUtils.createTestDir(conf.getComputationName());
File vertexInputFile = null;
File edgeInputFile = null;
@@ -137,7 +137,7 @@ public class InternalVertexRunner {
GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
// Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
+ GiraphJob job = new GiraphJob(conf, conf.getComputationName());
Job internalJob = job.getInternalJob();
if (conf.hasVertexInputFormat()) {
@@ -212,7 +212,7 @@ public class InternalVertexRunner {
File tmpDir = null;
try {
// Prepare temporary folders
- tmpDir = FileUtils.createTestDir(conf.getComputationClass());
+ tmpDir = FileUtils.createTestDir(conf.getComputationName());
File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
@@ -221,7 +221,7 @@ public class InternalVertexRunner {
conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
// Create and configure the job to run the vertex
- GiraphJob job = new GiraphJob(conf, conf.getComputationClass().getName());
+ GiraphJob job = new GiraphJob(conf, conf.getComputationName());
InMemoryVertexInputFormat.setGraph(graph);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
index 96352bb..9083769 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ReflectionUtils.java
@@ -19,17 +19,7 @@
package org.apache.giraph.utils;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import org.jodah.typetools.TypeResolver;
/**
* Helper methods to get type arguments to generic classes. Courtesy of
@@ -43,29 +33,23 @@ public class ReflectionUtils {
private ReflectionUtils() { }
/**
- * Get the underlying class for a type, or null if the type is
- * a variable type.
+ * Get package path to the object given. Used with resources.
*
- * @param type the type
- * @return the underlying class
+ * @param object the Object to check
+ * @return Path to package of object
*/
- public static Class<?> getClass(Type type) {
- if (type instanceof Class) {
- return (Class<?>) type;
- } else if (type instanceof ParameterizedType) {
- return getClass(((ParameterizedType) type).getRawType());
- } else if (type instanceof GenericArrayType) {
- Type componentType =
- ((GenericArrayType) type).getGenericComponentType();
- Class<?> componentClass = getClass(componentType);
- if (componentClass != null) {
- return Array.newInstance(componentClass, 0).getClass();
- } else {
- return null;
- }
- } else {
- return null;
- }
+ public static String getPackagePath(Object object) {
+ return getPackagePath(object.getClass());
+ }
+
+ /**
+ * Get package path to the class given. Used with resources.
+ *
+ * @param klass Class to check
+ * @return Path to package of class
+ */
+ public static String getPackagePath(Class klass) {
+ return "/" + klass.getPackage().getName().replaceAll("\\.", "/");
}
/**
@@ -77,94 +61,9 @@ public class ReflectionUtils {
* @param childClass the child class
* @return a list of the raw classes for the actual type arguments.
*/
- public static <T> List<Class<?>> getTypeArguments(
+ public static <T> Class<?>[] getTypeArguments(
Class<T> baseClass, Class<? extends T> childClass) {
- Map<Type, Type> resolvedTypes = new HashMap<Type, Type>();
- Type type = childClass;
- // start walking up the inheritance hierarchy until we hit baseClass
- while (! getClass(type).equals(baseClass)) {
- if (type instanceof Class) {
- Type newType = ((Class<?>) type).getGenericSuperclass();
- if (newType == null) {
- // we have reached an interface, so we stop here
- break;
- } else {
- // there is no useful information for us in raw types,
- // so just keep going.
- type = newType;
- }
-
- } else {
- ParameterizedType parameterizedType = (ParameterizedType) type;
- Class<?> rawType = (Class<?>) parameterizedType.getRawType();
-
- Type[] actualTypeArguments =
- parameterizedType.getActualTypeArguments();
- TypeVariable<?>[] typeParameters = rawType.getTypeParameters();
- for (int i = 0; i < actualTypeArguments.length; i++) {
- resolvedTypes.put(typeParameters[i],
- actualTypeArguments[i]);
- }
-
- if (!rawType.equals(baseClass)) {
- type = rawType.getGenericSuperclass();
- }
- }
- }
-
- // finally, for each actual type argument provided to baseClass,
- // determine (if possible) the raw class for that type argument.
- Type[] actualTypeArguments;
- if (type instanceof Class) {
- actualTypeArguments = ((Class<?>) type).getTypeParameters();
- } else {
- actualTypeArguments =
- ((ParameterizedType) type).getActualTypeArguments();
- }
- List<Class<?>> typeArgumentsAsClasses = new ArrayList<Class<?>>();
- // resolve types by chasing down type variables.
- for (Type baseType: actualTypeArguments) {
- while (resolvedTypes.containsKey(baseType)) {
- baseType = resolvedTypes.get(baseType);
- }
- typeArgumentsAsClasses.add(getClass(baseType));
- }
- return typeArgumentsAsClasses;
- }
-
- /**
- * Try to directly set a (possibly private) field on an Object.
- *
- * @param target Target to set the field on.
- * @param fieldname Name of field.
- * @param value Value to set on target.
- */
- public static void setField(Object target, String fieldname, Object value)
- throws NoSuchFieldException, IllegalAccessException {
- Field field = findDeclaredField(target.getClass(), fieldname);
- field.setAccessible(true);
- field.set(target, value);
- }
-
- /**
- * Find a declared field in a class or one of its super classes
- *
- * @param inClass Class to search for declared field.
- * @param fieldname Field name to search for
- * @return Field or will throw.
- * @throws NoSuchFieldException When field not found.
- */
- private static Field findDeclaredField(Class<?> inClass, String fieldname)
- throws NoSuchFieldException {
- while (!Object.class.equals(inClass)) {
- for (Field field : inClass.getDeclaredFields()) {
- if (field.getName().equalsIgnoreCase(fieldname)) {
- return field;
- }
- }
- inClass = inClass.getSuperclass();
- }
- throw new NoSuchFieldException();
+ return TypeResolver.resolveArguments(childClass, baseClass);
}
/**
@@ -199,7 +98,7 @@ public class ReflectionUtils {
public static <T> T newInstance(
Class<T> theClass,
ImmutableClassesGiraphConfiguration configuration) {
- T result = null;
+ T result;
try {
result = theClass.newInstance();
} catch (InstantiationException e) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py b/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
new file mode 100644
index 0000000..5ff918c
--- /dev/null
+++ b/giraph-core/src/main/resources/org/apache/giraph/benchmark/page-rank.py
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from org.apache.giraph.graph import BasicComputation
+from org.apache.hadoop.io import FloatWritable
+
+class PageRank(BasicComputation):
+ SUPERSTEP_COUNT = "giraph.pageRank.superstepCount"
+
+ def compute(self, vertex, messages):
+ if self.getSuperstep() >= 1:
+ total = 0
+ for msg in messages:
+ total += msg.get()
+ vertex.getValue().set((0.15 / self.getTotalNumVertices()) + 0.85 * total)
+
+ if self.getSuperstep() < self.getConf().getInt(self.SUPERSTEP_COUNT, 0):
+ self.sendMessageToAllEdges(vertex,
+ FloatWritable(vertex.getValue().get() / vertex.getNumEdges()))
+ else:
+ vertex.voteToHalt()
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index 137cb6e..dd2369a 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.junit.After;
@@ -66,8 +64,8 @@ public class BspCase implements Watcher {
/** Default path for temporary files */
static final Path DEFAULT_TEMP_DIR =
new Path(System.getProperty("java.io.tmpdir"), "_giraphTests");
-
- public static final String READER_VERTICES =
+
+ public static final String READER_VERTICES_OPT =
"GeneratedVertexReader.reader_vertices";
/** A filter for listing parts files */
@@ -109,7 +107,7 @@ public class BspCase implements Watcher {
conf.setZooKeeperConfiguration(getZooKeeperList());
}
// GeneratedInputSplit will generate 5 vertices
- conf.setLong(READER_VERTICES, 5);
+ conf.setLong(READER_VERTICES_OPT, 5);
// Setup pathes for temporary files
Path zookeeperDir = getTempPath("_bspZooKeeper");
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
new file mode 100644
index 0000000..245d342
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.jython;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
+import static org.junit.Assert.assertEquals;
+
+public class TestJython {
+ @Test
+ public void testCountEdges() throws Exception {
+ String[] edges = new String[] {
+ "1 2",
+ "2 3",
+ "2 4",
+ "4 1"
+ };
+
+ GiraphConfiguration conf = new GiraphConfiguration();
+ GiraphTypes types = new GiraphTypes(IntWritable.class, IntWritable.class,
+ NullWritable.class, NullWritable.class, NullWritable.class);
+ JythonUtils.init(conf, "count-edges.py", "CountEdges", types);
+ JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+ conf.setOutEdgesClass(ByteArrayEdges.class);
+ conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+ conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+ Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+ Map<Integer, Integer> values = parseResults(results);
+
+ // Check that all vertices with outgoing edges have been created
+ assertEquals(3, values.size());
+ // Check the number of edges for each vertex
+ assertEquals(1, (int) values.get(1));
+ assertEquals(2, (int) values.get(2));
+ assertEquals(1, (int) values.get(4));
+ }
+
+ private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+ Map<Integer, Integer> values = Maps.newHashMap();
+ for (String line : results) {
+ String[] tokens = line.split("\\s+");
+ int id = Integer.valueOf(tokens[0]);
+ int value = Integer.valueOf(tokens[1]);
+ values.put(id, value);
+ }
+ return values;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
new file mode 100644
index 0000000..c9b4ace
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/utils/TestReflectionUtils.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.utils;
+
+import org.apache.giraph.conf.TypesHolder;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.BasicComputation;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexResolver;
+import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+import static org.junit.Assert.assertEquals;
+
+public class TestReflectionUtils {
+ @Test
+ public void testPackagePath() {
+ assertEquals("/org/apache/giraph/utils",
+ ReflectionUtils.getPackagePath(TestReflectionUtils.class));
+ assertEquals("/org/apache/giraph/utils",
+ ReflectionUtils.getPackagePath(getClass()));
+ assertEquals("/org/apache/giraph/utils",
+ ReflectionUtils.getPackagePath(this));
+ }
+
+ private static class IntTypes implements TypesHolder<IntWritable,
+ IntWritable, IntWritable, IntWritable, IntWritable> { }
+
+ private static class IntComputation extends Computation<IntWritable,
+ IntWritable, IntWritable, IntWritable, IntWritable> {
+ @Override
+ public void compute(Vertex<IntWritable, IntWritable, IntWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ }
+ }
+
+ private static class IntBasicComputation extends BasicComputation<IntWritable,
+ IntWritable, IntWritable, IntWritable> {
+ @Override
+ public void compute(Vertex<IntWritable, IntWritable, IntWritable> vertex,
+ Iterable<IntWritable> messages) throws IOException {
+ }
+ }
+
+ @Test
+ public void testInferWithGenerics() {
+ Class<?>[] classes = getTypeArguments(VertexResolver.class,
+ DefaultVertexResolver.class);
+ assertEquals(3, classes.length);
+ assertEquals(WritableComparable.class, classes[0]);
+ assertEquals(Writable.class, classes[1]);
+ assertEquals(Writable.class, classes[2]);
+ classes = getTypeArguments(VertexValueFactory.class,
+ DefaultVertexValueFactory.class);
+ assertEquals(1, classes.length);
+ assertEquals(Writable.class, classes[0]);
+ classes = getTypeArguments(OutEdges.class, ByteArrayEdges.class);
+ assertEquals(2, classes.length);
+ assertEquals(WritableComparable.class, classes[0]);
+ assertEquals(Writable.class, classes[1]);
+ }
+
+ @Test
+ public void testInferTypeParams() {
+ checkTypes(TypesHolder.class, IntTypes.class, 5);
+ checkTypes(TypesHolder.class, IntComputation.class, 5);
+ checkTypes(Computation.class, IntComputation.class, 5);
+ checkTypes(TypesHolder.class, IntBasicComputation.class, 5);
+ checkTypes(Computation.class, IntBasicComputation.class, 5);
+ checkTypes(BasicComputation.class, IntBasicComputation.class, 4);
+ }
+
+ private <T> void checkTypes(Class<T> baseClass,
+ Class<? extends T> childClass, int numArgs) {
+ Class<?>[] classes = getTypeArguments(baseClass, childClass);
+ assertEquals(numArgs, classes.length);
+ for (Class<?> klass : classes) {
+ assertEquals(IntWritable.class, klass);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py b/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
new file mode 100644
index 0000000..4cad945
--- /dev/null
+++ b/giraph-core/src/test/resources/org/apache/giraph/jython/count-edges.py
@@ -0,0 +1,8 @@
+from org.apache.giraph.graph import BasicComputation
+from org.apache.hadoop.io import IntWritable
+
+class CountEdges(BasicComputation):
+ def compute(self, vertex, messages):
+ num_edges = vertex.getNumEdges()
+ vertex.setValue(IntWritable(num_edges))
+ vertex.voteToHalt()
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
index 13e8edf..1d74843 100644
--- a/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
+++ b/giraph-examples/src/main/java/org/apache/giraph/examples/GeneratedVertexReader.java
@@ -18,14 +18,17 @@
package org.apache.giraph.examples;
-import java.io.IOException;
import org.apache.giraph.bsp.BspInputSplit;
+import org.apache.giraph.conf.BooleanConfOption;
+import org.apache.giraph.conf.LongConfOption;
import org.apache.giraph.io.VertexReader;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import java.io.IOException;
+
/**
* Used by GeneratedVertexInputFormat to read some generated data
*
@@ -39,15 +42,11 @@ public abstract class GeneratedVertexReader<
E extends Writable>
extends VertexReader<I, V, E> {
/** Vertices produced by this reader */
- public static final String READER_VERTICES =
- "GeneratedVertexReader.reader_vertices";
- /** Default vertices produced by this reader */
- public static final long DEFAULT_READER_VERTICES = 10;
+ public static final LongConfOption READER_VERTICES =
+ new LongConfOption("GeneratedVertexReader.reader_vertices", 10);
/** Reverse the order of the vertices? */
- public static final String REVERSE_ID_ORDER =
- "GeneratedVertexReader.reverseIdOrder";
- /** Default ordering is not reversed */
- public static final boolean DEAFULT_REVERSE_ID_ORDER = false;
+ public static final BooleanConfOption REVERSE_ID_ORDER =
+ new BooleanConfOption("GeneratedVertexReader.reverseIdOrder", false);
/** Records read so far */
protected long recordsRead = 0;
/** Total records to read (on this split alone) */
@@ -66,12 +65,8 @@ public abstract class GeneratedVertexReader<
@Override
public final void initialize(InputSplit inputSplit,
TaskAttemptContext context) throws IOException {
- totalRecords = getConf().getLong(
- GeneratedVertexReader.READER_VERTICES,
- GeneratedVertexReader.DEFAULT_READER_VERTICES);
- reverseIdOrder = getConf().getBoolean(
- GeneratedVertexReader.REVERSE_ID_ORDER,
- GeneratedVertexReader.DEAFULT_REVERSE_ID_ORDER);
+ totalRecords = GeneratedVertexReader.READER_VERTICES.get(getConf());
+ reverseIdOrder = GeneratedVertexReader.REVERSE_ID_ORDER.get(getConf());
this.inputSplit = (BspInputSplit) inputSplit;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 6c8a0b3..c4286ab 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -243,7 +243,7 @@ public class
GiraphJob job = prepareJob(callingMethod, conf, outputPath);
Configuration configuration = job.getConfiguration();
// GeneratedInputSplit will generate 10 vertices
- configuration.setLong(GeneratedVertexReader.READER_VERTICES, 10);
+ GeneratedVertexReader.READER_VERTICES.set(configuration, 10);
assertTrue(job.run(true));
if (!runningInDistributedMode()) {
FileStatus fileStatus = getSinglePartFileStatus(configuration, outputPath);
@@ -284,7 +284,7 @@ public class
conf.setComputationClass(SimpleMsgComputation.class);
conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), conf);
- job.getConfiguration().setLong(GeneratedVertexReader.READER_VERTICES, 0);
+ GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(), 0);
assertTrue(job.run(true));
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
index 1092eac..75b98e4 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestComputationState.java
@@ -54,8 +54,7 @@ public class TestComputationState extends BspCase {
job.getConfiguration().setNumComputeThreads(
TestComputationStateComputation.NUM_COMPUTE_THREADS);
// Increase the number of vertices
- job.getConfiguration().setInt(
- GeneratedVertexReader.READER_VERTICES,
+ GeneratedVertexReader.READER_VERTICES.set(job.getConfiguration(),
TestComputationStateComputation.NUM_VERTICES);
// Increase the number of partitions
GiraphConstants.USER_PARTITION_COUNT.set(job.getConfiguration(),
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index 4537eac..bf87491 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -36,6 +36,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -158,8 +159,7 @@ public class TestGraphPartitioner extends BspCase {
outputPath);
job.getConfiguration().setGraphPartitionerFactoryClass(
SuperstepHashPartitionerFactory.class);
- job.getConfiguration().setBoolean(
- GeneratedVertexReader.REVERSE_ID_ORDER, true);
+ GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true);
assertTrue(job.run(true));
verifyOutput(hdfs, outputPath);
@@ -178,8 +178,8 @@ public class TestGraphPartitioner extends BspCase {
job.getConfiguration().setGraphPartitionerFactoryClass(
SimpleLongRangePartitionerFactory.class);
- long readerVertices = job.getConfiguration().getLong(
- GeneratedVertexReader.READER_VERTICES, -1);
+ long readerVertices =
+ READER_VERTICES.getWithDefault(job.getConfiguration(), -1L);
job.getConfiguration().setLong(
GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, readerVertices);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 340b461..589fed6 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -24,7 +24,6 @@ import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
-import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.Computation;
@@ -635,8 +634,6 @@ public class HiveGiraphRunner implements Tool {
* @param giraphConf GiraphConfiguration
*/
private void logOptions(GiraphConfiguration giraphConf) {
- GiraphClasses<?, ?, ?> classes = new GiraphClasses(giraphConf);
-
LOG.info(getClass().getSimpleName() + " with");
LOG.info(LOG_PREFIX + "-computationClass=" +
@@ -650,7 +647,7 @@ public class HiveGiraphRunner implements Tool {
LOG.info(LOG_PREFIX + "Edge input: " + description);
}
- if (classes.getVertexOutputFormatClass() != null) {
+ if (GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.contains(giraphConf)) {
LOG.info(LOG_PREFIX + "Output: VertexToHive=" +
vertexToHiveClass.getCanonicalName() + ", table=" +
HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" +
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0bd85a4..7e166c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -255,6 +255,9 @@ under the License.
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<dep.hiveio.version>0.11</dep.hiveio.version>
+ <dep.jython.version>2.5.3</dep.jython.version>
+ <dep.typetools.version>0.2.1</dep.typetools.version>
+
<hbase.version>0.90.5</hbase.version>
<codehaus-jackson.version>1.8.0</codehaus-jackson.version>
<fasterxml-jackson.version>2.1.0</fasterxml-jackson.version>
@@ -1107,11 +1110,21 @@ under the License.
<version>${codehaus-jackson.version}</version>
</dependency>
<dependency>
+ <groupId>org.jodah</groupId>
+ <artifactId>typetools</artifactId>
+ <version>${dep.typetools.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20090211</version>
</dependency>
<dependency>
+ <groupId>org.python</groupId>
+ <artifactId>jython</artifactId>
+ <version>${dep.jython.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
[2/2] git commit: updated refs/heads/trunk to 8f89bd8
Posted by ni...@apache.org.
GIRAPH-683: Jython for Computation (nitay)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8f89bd85
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8f89bd85
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8f89bd85
Branch: refs/heads/trunk
Commit: 8f89bd85a03a1fec25e21e334631931f69078040
Parents: 1eaddd1
Author: Nitay Joffe <ni...@apache.org>
Authored: Thu Jun 20 14:21:48 2013 -0400
Committer: Nitay Joffe <ni...@apache.org>
Committed: Thu Jun 20 14:22:03 2013 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
README | 19 ++
giraph-core/pom.xml | 8 +
.../giraph/benchmark/BenchmarkOption.java | 7 +
.../giraph/benchmark/PageRankBenchmark.java | 32 +-
.../apache/giraph/conf/AbstractConfOption.java | 22 ++
.../java/org/apache/giraph/conf/AllOptions.java | 3 +
.../apache/giraph/conf/BooleanConfOption.java | 17 +-
.../org/apache/giraph/conf/ClassConfOption.java | 27 ++
.../org/apache/giraph/conf/ConfOptionType.java | 4 +-
.../org/apache/giraph/conf/EnumConfOption.java | 95 ++++++
.../org/apache/giraph/conf/FloatConfOption.java | 10 +-
.../org/apache/giraph/conf/GiraphClasses.java | 105 ++++---
.../apache/giraph/conf/GiraphConfiguration.java | 35 ++-
.../org/apache/giraph/conf/GiraphConstants.java | 20 +-
.../org/apache/giraph/conf/GiraphTypes.java | 213 +++++++++++++
.../ImmutableClassesGiraphConfiguration.java | 38 ++-
.../org/apache/giraph/conf/IntConfOption.java | 11 +-
.../org/apache/giraph/conf/LongConfOption.java | 21 +-
.../org/apache/giraph/conf/StrConfOption.java | 12 +
.../org/apache/giraph/conf/TypesHolder.java | 37 +++
.../org/apache/giraph/graph/Computation.java | 9 +-
.../apache/giraph/graph/ComputationFactory.java | 69 +++++
.../giraph/graph/DefaultComputationFactory.java | 63 ++++
.../apache/giraph/graph/GraphTaskManager.java | 45 +--
.../java/org/apache/giraph/graph/Language.java | 28 ++
.../io/formats/IntIntNullTextInputFormat.java | 7 +-
.../io/formats/LongLongNullTextInputFormat.java | 89 ++++++
.../job/GiraphConfigurationValidator.java | 309 ++++++++-----------
.../org/apache/giraph/jython/DeployType.java | 28 ++
.../giraph/jython/JythonComputationFactory.java | 161 ++++++++++
.../org/apache/giraph/jython/JythonUtils.java | 74 +++++
.../org/apache/giraph/jython/package-info.java | 21 ++
.../apache/giraph/master/SuperstepClasses.java | 37 ++-
.../apache/giraph/utils/ConfigurationUtils.java | 191 ++++++++----
.../giraph/utils/DistributedCacheUtils.java | 101 ++++++
.../java/org/apache/giraph/utils/FileUtils.java | 6 +-
.../giraph/utils/InternalVertexRunner.java | 8 +-
.../apache/giraph/utils/ReflectionUtils.java | 139 ++-------
.../org/apache/giraph/benchmark/page-rank.py | 34 ++
.../test/java/org/apache/giraph/BspCase.java | 8 +-
.../org/apache/giraph/jython/TestJython.java | 77 +++++
.../giraph/utils/TestReflectionUtils.java | 106 +++++++
.../org/apache/giraph/jython/count-edges.py | 8 +
.../giraph/examples/GeneratedVertexReader.java | 25 +-
.../java/org/apache/giraph/TestBspBasic.java | 4 +-
.../org/apache/giraph/TestComputationState.java | 3 +-
.../org/apache/giraph/TestGraphPartitioner.java | 8 +-
.../apache/giraph/hive/HiveGiraphRunner.java | 5 +-
pom.xml | 13 +
50 files changed, 1907 insertions(+), 507 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 76f7e6b..dda130e 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-683: Jython for Computation (nitay)
+
GIRAPH-673: Input superstep should support aggregators like any
other superstep (Bingjing via aching)
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/README
----------------------------------------------------------------------
diff --git a/README b/README
index c2a32ed..fd9ec20 100644
--- a/README
+++ b/README
@@ -131,6 +131,25 @@ locally first.
-------------------------------
+Scripting:
+
+Giraph has support for writing user logic in languages other than Java. A Giraph
+job involves at the very least a Computation and Input/Output Formats. There are
+other optional pieces as well like Aggregators and Combiners.
+
+As of this writing we support writing the Computation logic in Jython. The
+Computation class is at the core of the algorithm so it was a natural starting
+point. Eventually it is our goal to allow users to write any / all components of
+their algorithms in any language they desire.
+
+To use Jython with our job launcher, GiraphRunner, pass the path to the script
+as the Computation class argument. Additionally, you should set the -jythonClass
+option to let Giraph know the name of your Jython Computation class. Lastly, you
+will need to set -typesHolder to a class that extends Giraph's TypesHolder so
+that Giraph can infer the types you use. Look at page-rank.py as an example.
+
+-------------------------------
+
How to run the unittests on a local pseudo-distributed Hadoop instance:
As mentioned earlier, Giraph supports several versions of Hadoop. In
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-core/pom.xml b/giraph-core/pom.xml
index 3ffe175..fdfff4b 100644
--- a/giraph-core/pom.xml
+++ b/giraph-core/pom.xml
@@ -457,10 +457,18 @@ under the License.
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
<dependency>
+ <groupId>org.jodah</groupId>
+ <artifactId>typetools</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
</dependency>
<dependency>
+ <groupId>org.python</groupId>
+ <artifactId>jython</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
index 23c614b..ca1fd19 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/BenchmarkOption.java
@@ -55,6 +55,13 @@ public class BenchmarkOption {
new BenchmarkOption(
"l", "localEdgesMinRatio", true,
"Minimum ratio of partition-local edges (default is 0)");
+ /** Option for using Jython */
+ public static final BenchmarkOption JYTHON =
+ new BenchmarkOption("j", "jython", false, "Use jython implementation");
+ /** Option for path to script for computation */
+ public static final BenchmarkOption SCRIPT_PATH =
+ new BenchmarkOption("sp", "scriptPath", true,
+ "Path to script for computation, can be local or HDFS path");
/** Short option */
private String shortOption;
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
index bd2939e..d88ff0d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
+++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java
@@ -21,15 +21,24 @@ package org.apache.giraph.benchmark;
import org.apache.commons.cli.CommandLine;
import org.apache.giraph.combiner.FloatSumCombiner;
import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphTypes;
import org.apache.giraph.edge.IntNullArrayEdges;
import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants;
import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat;
+import org.apache.giraph.jython.DeployType;
+import org.apache.giraph.jython.JythonUtils;
+import org.apache.giraph.utils.DistributedCacheUtils;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.collect.Sets;
import java.util.Set;
+import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE;
+
/**
* Benchmark for {@link PageRankComputation}
*/
@@ -38,13 +47,32 @@ public class PageRankBenchmark extends GiraphBenchmark {
public Set<BenchmarkOption> getBenchmarkOptions() {
return Sets.newHashSet(BenchmarkOption.VERTICES,
BenchmarkOption.EDGES_PER_VERTEX, BenchmarkOption.SUPERSTEPS,
- BenchmarkOption.LOCAL_EDGES_MIN_RATIO);
+ BenchmarkOption.LOCAL_EDGES_MIN_RATIO, BenchmarkOption.JYTHON,
+ BenchmarkOption.SCRIPT_PATH);
}
@Override
protected void prepareConfiguration(GiraphConfiguration conf,
CommandLine cmd) {
- conf.setComputationClass(PageRankComputation.class);
+ if (BenchmarkOption.JYTHON.optionTurnedOn(cmd)) {
+ GiraphTypes types = new GiraphTypes();
+ types.inferFrom(PageRankComputation.class);
+ String script;
+ if (BenchmarkOption.SCRIPT_PATH.optionTurnedOn(cmd)) {
+ JYTHON_DEPLOY_TYPE.set(conf, DeployType.DISTRIBUTED_CACHE);
+ String path = BenchmarkOption.SCRIPT_PATH.getOptionValue(cmd);
+ Path hadoopPath = new Path(path);
+ Path remotePath = DistributedCacheUtils.copyToHdfs(hadoopPath, conf);
+ DistributedCache.addCacheFile(remotePath.toUri(), conf);
+ script = remotePath.toString();
+ } else {
+ JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE);
+ script = ReflectionUtils.getPackagePath(this) + "/page-rank.py";
+ }
+ JythonUtils.init(conf, script, "PageRank", types);
+ } else {
+ conf.setComputationClass(PageRankComputation.class);
+ }
conf.setOutEdgesClass(IntNullArrayEdges.class);
conf.setCombinerClass(FloatSumCombiner.class);
conf.setVertexInputFormatClass(
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
index d00f7e9..e1ab802 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AbstractConfOption.java
@@ -17,6 +17,7 @@
*/
package org.apache.giraph.conf;
+import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import com.google.common.base.Objects;
@@ -35,6 +36,7 @@ public abstract class AbstractConfOption
/**
* Constructor
+ *
* @param key configuration key
*/
public AbstractConfOption(String key) {
@@ -45,6 +47,16 @@ public abstract class AbstractConfOption
return key;
}
+ /**
+ * Check if option is set in configuration
+ *
+ * @param conf Configuration
+ * @return true if option is set
+ */
+ public boolean contains(Configuration conf) {
+ return conf.get(key) != null;
+ }
+
@Override public int compareTo(AbstractConfOption o) {
return ComparisonChain.start()
.compare(getType(), o.getType())
@@ -79,13 +91,23 @@ public abstract class AbstractConfOption
}
/**
+ * Check if the value set is the same as the default value
+ *
+ * @param conf Configuration
+ * @return true if value set is default value
+ */
+ public abstract boolean isDefaultValue(Configuration conf);
+
+ /**
* Get string representation of default value
+ *
* @return String
*/
public abstract String getDefaultValueStr();
/**
* Get type this option holds
+ *
* @return ConfOptionType
*/
public abstract ConfOptionType getType();
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
index 5d150d0..68b3ed9 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/AllOptions.java
@@ -43,6 +43,7 @@ public class AllOptions {
/**
* Add an option. Subclasses of {@link AbstractConfOption} should call this
* at the end of their constructor.
+ *
* @param confOption option
*/
public static void add(AbstractConfOption confOption) {
@@ -51,6 +52,7 @@ public class AllOptions {
/**
* String representation of all of the options stored
+ *
* @return string
*/
public static String allOptionsString() {
@@ -70,6 +72,7 @@ public class AllOptions {
/**
* Command line utility to dump all Giraph options
+ *
* @param args cmdline args
*/
public static void main(String[] args) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
index c16ec88..f095905 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/BooleanConfOption.java
@@ -28,6 +28,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key configuration key
* @param defaultValue default value
*/
@@ -37,10 +38,19 @@ public class BooleanConfOption extends AbstractConfOption {
AllOptions.add(this);
}
- public boolean isDefaultValue() {
+ /**
+ * Get the default value of this option
+ *
+ * @return default value
+ */
+ public boolean getDefaultValue() {
return defaultValue;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return get(conf) == defaultValue;
+ }
+
@Override public String getDefaultValueStr() {
return Boolean.toString(defaultValue);
}
@@ -51,6 +61,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Lookup value in Configuration
+ *
* @param conf Configuration
* @return value for key in conf, or defaultValue if not present
*/
@@ -60,6 +71,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Check if value is true
+ *
* @param conf Configuration
* @return true if value is set and true, false otherwise
*/
@@ -69,6 +81,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Check if value is false
+ *
* @param conf Configuration
* @return true if value is set and true, false otherwise
*/
@@ -78,6 +91,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Set value in configuration for this key
+ *
* @param conf Configuration
* @param value to set
*/
@@ -87,6 +101,7 @@ public class BooleanConfOption extends AbstractConfOption {
/**
* Set value in configuration if it hasn't been set already
+ *
* @param conf Configuration
* @param value to set
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
index 41d120b..43d6b0f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ClassConfOption.java
@@ -20,8 +20,11 @@ package org.apache.giraph.conf;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
+import com.google.common.base.Objects;
+
/**
* Class configuration option
+ *
* @param <C> interface of class
*/
public class ClassConfOption<C> extends AbstractConfOption {
@@ -35,6 +38,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Private constructor
+ *
* @param key Key
* @param defaultClass default class
* @param interfaceClass interface class
@@ -49,6 +53,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Static create method
+ *
* @param key key
* @param defaultClass default class
* @param interfaceClass interface class
@@ -68,6 +73,10 @@ public class ClassConfOption<C> extends AbstractConfOption {
return interfaceClass;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return Objects.equal(get(conf), defaultClass);
+ }
+
@Override public String getDefaultValueStr() {
return defaultClass == null ? "null" : defaultClass.getSimpleName();
}
@@ -87,6 +96,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Lookup value
+ *
* @param conf Configuration
* @return Class set for key, or defaultClass
*/
@@ -96,6 +106,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Lookup array of classes for key
+ *
* @param conf Configuration
* @return array of classes
*/
@@ -127,6 +138,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Lookup with user specified default value
+ *
* @param conf Configuration
* @param defaultValue default value
* @return Class
@@ -138,6 +150,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Set value for key
+ *
* @param conf Configuration
* @param klass Class to set
*/
@@ -146,7 +159,20 @@ public class ClassConfOption<C> extends AbstractConfOption {
}
/**
+ * Set value for key if it is not already set
+ *
+ * @param conf Configuration
+ * @param klass Class to set
+ */
+ public void setIfUnset(Configuration conf, Class<? extends C> klass) {
+ if (!contains(conf)) {
+ set(conf, klass);
+ }
+ }
+
+ /**
* Set classes for this key
+ *
* @param conf Configuration
* @param klasses Classes to set
*/
@@ -165,6 +191,7 @@ public class ClassConfOption<C> extends AbstractConfOption {
/**
* Add class to list for key
+ *
* @param conf Configuration
* @param klass Class to add
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
index 8f70d90..edb878a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ConfOptionType.java
@@ -25,6 +25,8 @@ public enum ConfOptionType {
BOOLEAN,
/** class */
CLASS,
+ /** enum */
+ ENUM,
/** integer */
INTEGER,
/** float */
@@ -32,5 +34,5 @@ public enum ConfOptionType {
/** long */
LONG,
/** string */
- STRING;
+ STRING
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
new file mode 100644
index 0000000..0e23379
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/EnumConfOption.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Objects;
+
+/**
+ * Enum Configuration option
+ *
+ * @param <T> Enum class
+ */
+public class EnumConfOption<T extends Enum<T>> extends AbstractConfOption {
+ /** Enum class */
+ private final Class<T> klass;
+ /** Default value */
+ private final T defaultValue;
+
+ /**
+ * Constructor
+ *
+ * @param key Configuration key
+ * @param klass Enum class
+ * @param defaultValue default value
+ */
+ public EnumConfOption(String key, Class<T> klass, T defaultValue) {
+ super(key);
+ this.klass = klass;
+ this.defaultValue = defaultValue;
+ AllOptions.add(this);
+ }
+
+ /**
+ * Create new EnumConfOption
+ *
+ * @param key String configuration key
+ * @param klass enum class
+ * @param defaultValue default enum value
+ * @param <X> enum type
+ * @return EnumConfOption
+ */
+ public static <X extends Enum<X>> EnumConfOption<X>
+ create(String key, Class<X> klass, X defaultValue) {
+ return new EnumConfOption<X>(key, klass, defaultValue);
+ }
+
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return Objects.equal(get(conf), defaultValue);
+ }
+
+ @Override public String getDefaultValueStr() {
+ return defaultValue.name();
+ }
+
+ @Override public ConfOptionType getType() {
+ return ConfOptionType.ENUM;
+ }
+
+ /**
+ * Lookup value
+ *
+ * @param conf Configuration
+ * @return enum value
+ */
+ public T get(Configuration conf) {
+ String valueStr = conf.get(getKey(), getDefaultValueStr());
+ return T.valueOf(klass, valueStr);
+ }
+
+ /**
+ * Set value
+ *
+ * @param conf Configuration
+ * @param value to set
+ */
+ public void set(Configuration conf, Enum<T> value) {
+ conf.set(getKey(), value.name());
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
index fa21a28..62efdbe 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/FloatConfOption.java
@@ -28,6 +28,7 @@ public class FloatConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key Configuration key
* @param defaultValue default value
*/
@@ -41,6 +42,10 @@ public class FloatConfOption extends AbstractConfOption {
return defaultValue;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return Float.compare(get(conf), defaultValue) == 0;
+ }
+
@Override public String getDefaultValueStr() {
return Float.toString(defaultValue);
}
@@ -51,6 +56,7 @@ public class FloatConfOption extends AbstractConfOption {
/**
* Lookup value
+ *
* @param conf Configuration
* @return value for key, or defaultValue if not present
*/
@@ -60,6 +66,7 @@ public class FloatConfOption extends AbstractConfOption {
/**
* Set value
+ *
* @param conf Configuration
* @param value to set
*/
@@ -69,11 +76,12 @@ public class FloatConfOption extends AbstractConfOption {
/**
* Set value if it's not already present
+ *
* @param conf Configuration
* @param value to set
*/
public void setIfUnset(Configuration conf, float value) {
- if (conf.get(getKey()) == null) {
+ if (!contains(conf)) {
conf.setFloat(getKey(), value);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index 621bb14..f039bdc 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -20,34 +20,35 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.giraph.graph.DefaultComputationFactory;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
+import org.apache.giraph.io.EdgeInputFormat;
+import org.apache.giraph.io.VertexInputFormat;
+import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.filters.DefaultEdgeInputFilter;
import org.apache.giraph.io.filters.DefaultVertexInputFilter;
import org.apache.giraph.io.filters.EdgeInputFilter;
-import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.io.filters.VertexInputFilter;
-import org.apache.giraph.io.VertexInputFormat;
-import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.HashPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.SimplePartition;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import java.util.List;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
/**
* Holder for classes used by Giraph.
@@ -60,20 +61,16 @@ import java.util.List;
public class GiraphClasses<I extends WritableComparable,
V extends Writable, E extends Writable>
implements GiraphConstants {
+ /** ComputationFactory class - cached for fast access */
+ protected Class<? extends ComputationFactory<I, V, E,
+ ? extends Writable, ? extends Writable>>
+ computationFactoryClass;
/** Computation class - cached for fast access */
protected Class<? extends
Computation<I, V, E, ? extends Writable, ? extends Writable>>
computationClass;
- /** Vertex id class - cached for fast access */
- protected Class<I> vertexIdClass;
- /** Vertex value class - cached for fast access */
- protected Class<V> vertexValueClass;
- /** Edge value class - cached for fast access */
- protected Class<E> edgeValueClass;
- /** Incoming message value class - cached for fast access */
- protected Class<? extends Writable> incomingMessageValueClass;
- /** Outgoing message value class - cached for fast access */
- protected Class<? extends Writable> outgoingMessageValueClass;
+ /** Generic types used to describe graph */
+ protected GiraphTypes<I, V, E> giraphTypes;
/** Vertex edges class - cached for fast access */
protected Class<? extends OutEdges<I, E>> outEdgesClass;
/** Input vertex edges class - cached for fast access */
@@ -123,6 +120,10 @@ public class GiraphClasses<I extends WritableComparable,
public GiraphClasses() {
// Note: the cast to Object is required in order for javac to accept the
// downcast.
+ computationFactoryClass = (Class<? extends ComputationFactory<I, V, E,
+ ? extends Writable, ? extends Writable>>) (Object)
+ DefaultComputationFactory.class;
+ giraphTypes = new GiraphTypes<I, V, E>();
outEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
ByteArrayEdges.class;
inputOutEdgesClass = (Class<? extends OutEdges<I, E>>) (Object)
@@ -151,27 +152,15 @@ public class GiraphClasses<I extends WritableComparable,
* @param conf Configuration object to read from.
*/
public GiraphClasses(Configuration conf) {
- readFromConf(conf);
- }
-
- /**
- * Read classes from Configuration.
- *
- * @param conf Configuration to read from.
- */
- private void readFromConf(Configuration conf) {
- // set pre-validated generic parameter types into Configuration
+ giraphTypes = GiraphTypes.readFrom(conf);
+ computationFactoryClass =
+ (Class<? extends ComputationFactory<I, V, E,
+ ? extends Writable, ? extends Writable>>)
+ COMPUTATION_FACTORY_CLASS.get(conf);
computationClass =
(Class<? extends
Computation<I, V, E, ? extends Writable, ? extends Writable>>)
COMPUTATION_CLASS.get(conf);
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(Computation.class, computationClass);
- vertexIdClass = (Class<I>) classList.get(0);
- vertexValueClass = (Class<V>) classList.get(1);
- edgeValueClass = (Class<E>) classList.get(2);
- incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
- outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
outEdgesClass = (Class<? extends OutEdges<I, E>>)
VERTEX_EDGES_CLASS.get(conf);
@@ -207,6 +196,11 @@ public class GiraphClasses<I extends WritableComparable,
VERTEX_INPUT_FILTER_CLASS.get(conf);
}
+ public Class<? extends ComputationFactory<I, V, E,
+ ? extends Writable, ? extends Writable>> getComputationFactoryClass() {
+ return computationFactoryClass;
+ }
+
/**
* Get Computation class
*
@@ -218,13 +212,17 @@ public class GiraphClasses<I extends WritableComparable,
return computationClass;
}
+ public GiraphTypes<I, V, E> getGiraphTypes() {
+ return giraphTypes;
+ }
+
/**
* Get Vertex ID class
*
* @return Vertex ID class
*/
public Class<I> getVertexIdClass() {
- return vertexIdClass;
+ return giraphTypes.getVertexIdClass();
}
/**
@@ -233,7 +231,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return Vertex Value class
*/
public Class<V> getVertexValueClass() {
- return vertexValueClass;
+ return giraphTypes.getVertexValueClass();
}
/**
@@ -242,7 +240,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return Edge Value class
*/
public Class<E> getEdgeValueClass() {
- return edgeValueClass;
+ return giraphTypes.getEdgeValueClass();
}
/**
@@ -252,7 +250,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return Message Value class
*/
public Class<? extends Writable> getIncomingMessageValueClass() {
- return incomingMessageValueClass;
+ return giraphTypes.getIncomingMessageValueClass();
}
/**
@@ -262,7 +260,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return Message Value class
*/
public Class<? extends Writable> getOutgoingMessageValueClass() {
- return outgoingMessageValueClass;
+ return giraphTypes.getOutgoingMessageValueClass();
}
/**
@@ -274,10 +272,11 @@ public class GiraphClasses<I extends WritableComparable,
return outEdgesClass;
}
- /* Get Vertex edges class used during edge-based input
- *
- * @return Vertex edges class.
- */
+ /**
+ * Get Vertex edges class used during edge-based input
+ *
+ * @return Vertex edges class.
+ */
public Class<? extends OutEdges<I, E>> getInputOutEdgesClass() {
return inputOutEdgesClass;
}
@@ -486,10 +485,14 @@ public class GiraphClasses<I extends WritableComparable,
Computation<I, V, E, ? extends Writable, ? extends Writable>>
computationClass) {
this.computationClass = computationClass;
- List<Class<?>> classList =
- ReflectionUtils.getTypeArguments(Computation.class, computationClass);
- incomingMessageValueClass = (Class<? extends Writable>) classList.get(3);
- outgoingMessageValueClass = (Class<? extends Writable>) classList.get(4);
+ if (computationClass != null) {
+ Class<?>[] classList =
+ getTypeArguments(TypesHolder.class, computationClass);
+ giraphTypes.setIncomingMessageValueClass(
+ (Class<? extends Writable>) classList[3]);
+ giraphTypes.setOutgoingMessageValueClass(
+ (Class<? extends Writable>) classList[4]);
+ }
return this;
}
@@ -500,7 +503,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setVertexIdClass(Class<I> vertexIdClass) {
- this.vertexIdClass = vertexIdClass;
+ giraphTypes.setVertexIdClass(vertexIdClass);
return this;
}
@@ -511,7 +514,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setVertexValueClass(Class<V> vertexValueClass) {
- this.vertexValueClass = vertexValueClass;
+ giraphTypes.setVertexValueClass(vertexValueClass);
return this;
}
@@ -522,7 +525,7 @@ public class GiraphClasses<I extends WritableComparable,
* @return this
*/
public GiraphClasses setEdgeValueClass(Class<E> edgeValueClass) {
- this.edgeValueClass = edgeValueClass;
+ giraphTypes.setEdgeValueClass(edgeValueClass);
return this;
}
@@ -535,7 +538,7 @@ public class GiraphClasses<I extends WritableComparable,
*/
public GiraphClasses setIncomingMessageValueClass(
Class<? extends Writable> incomingMessageValueClass) {
- this.incomingMessageValueClass = incomingMessageValueClass;
+ giraphTypes.setIncomingMessageValueClass(incomingMessageValueClass);
return this;
}
@@ -548,7 +551,7 @@ public class GiraphClasses<I extends WritableComparable,
*/
public GiraphClasses setOutgoingMessageValueClass(
Class<? extends Writable> outgoingMessageValueClass) {
- this.outgoingMessageValueClass = outgoingMessageValueClass;
+ giraphTypes.setOutgoingMessageValueClass(outgoingMessageValueClass);
return this;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 58a3f01..87c4c18 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -20,9 +20,10 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReuseObjectsOutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
@@ -36,6 +37,7 @@ import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.ReusesObjectsPartition;
+import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.hadoop.conf.Configuration;
@@ -78,6 +80,27 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Get name of computation being run. We leave this up to the
+ * {@link ComputationFactory} to decide what to return.
+ *
+ * @return Name of computation being run
+ */
+ public String getComputationName() {
+ ComputationFactory compFactory = ReflectionUtils.newInstance(
+ getComputationFactoryClass());
+ return compFactory.computationName(this);
+ }
+
+ /**
+ * Get the user's subclassed {@link ComputationFactory}
+ *
+ * @return User's computation factory class
+ */
+ public Class<? extends ComputationFactory> getComputationFactoryClass() {
+ return COMPUTATION_FACTORY_CLASS.get(this);
+ }
+
+ /**
* Get the user's subclassed {@link Computation}
*
* @return User's computation class
@@ -525,6 +548,7 @@ public class GiraphConfiguration extends Configuration
/**
* Getter for SPLIT_MASTER_WORKER flag.
+ *
* @return boolean flag value.
*/
public final boolean getSplitMasterWorker() {
@@ -570,6 +594,7 @@ public class GiraphConfiguration extends Configuration
/**
* Is this a "pure YARN" Giraph job, or is a MapReduce layer (v1 or v2)
* actually managing our cluster nodes, i.e. each task is a Mapper.
+ *
* @return TRUE if this is a pure YARN job.
*/
public boolean isPureYarnJob() {
@@ -579,6 +604,7 @@ public class GiraphConfiguration extends Configuration
/**
* Jars required in "Pure YARN" jobs (names only, no paths) should
* be listed here in full, including Giraph framework jar(s).
+ *
* @return the comma-separated list of jar names for export to cluster.
*/
public String getYarnLibJars() {
@@ -587,6 +613,7 @@ public class GiraphConfiguration extends Configuration
/**
* Populate jar list for Pure YARN jobs.
+ *
* @param jarList a comma-separated list of jar names
*/
public void setYarnLibJars(String jarList) {
@@ -596,6 +623,7 @@ public class GiraphConfiguration extends Configuration
/**
* Get heap size (in MB) for each task in our Giraph job run,
* assuming this job will run on the "pure YARN" profile.
+ *
* @return the heap size for all tasks, in MB
*/
public int getYarnTaskHeapMb() {
@@ -605,6 +633,7 @@ public class GiraphConfiguration extends Configuration
/**
* Set heap size for Giraph tasks in our job run, assuming
* the job will run on the "pure YARN" profile.
+ *
* @param heapMb the heap size for all tasks
*/
public void setYarnTaskHeapMb(int heapMb) {
@@ -635,6 +664,7 @@ public class GiraphConfiguration extends Configuration
/**
* is this job run a local test?
+ *
* @return the test status as recorded in the Configuration
*/
public boolean getLocalTestMode() {
@@ -643,6 +673,7 @@ public class GiraphConfiguration extends Configuration
/**
* Flag this job as a local test run.
+ *
* @param flag the test status for this job
*/
public void setLocalTestMode(boolean flag) {
@@ -652,6 +683,7 @@ public class GiraphConfiguration extends Configuration
/**
* The number of server tasks in our ZK quorum for
* this job run.
+ *
* @return the number of ZK servers in the quorum
*/
public int getZooKeeperServerCount() {
@@ -1014,6 +1046,7 @@ public class GiraphConfiguration extends Configuration
/**
* Get the output directory to write YourKit snapshots to
+ *
* @param context Map context
* @return output directory
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index 2d0f59c..f090baa 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -20,11 +20,14 @@ package org.apache.giraph.conf;
import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.aggregators.TextAggregatorWriter;
import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.graph.Computation;
import org.apache.giraph.edge.ByteArrayEdges;
import org.apache.giraph.edge.OutEdges;
+import org.apache.giraph.graph.Computation;
+import org.apache.giraph.graph.ComputationFactory;
+import org.apache.giraph.graph.DefaultComputationFactory;
import org.apache.giraph.graph.DefaultVertexResolver;
import org.apache.giraph.graph.DefaultVertexValueFactory;
+import org.apache.giraph.graph.Language;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.graph.VertexValueFactory;
import org.apache.giraph.io.EdgeInputFormat;
@@ -64,6 +67,14 @@ public interface GiraphConstants {
ClassConfOption<Computation> COMPUTATION_CLASS =
ClassConfOption.create("giraph.computationClass", null,
Computation.class);
+ /** Computation factory class - optional */
+ ClassConfOption<ComputationFactory> COMPUTATION_FACTORY_CLASS =
+ ClassConfOption.create("giraph.computation.factory.class",
+ DefaultComputationFactory.class, ComputationFactory.class);
+ /** TypesHolder, used if Computation not set - optional */
+ ClassConfOption<TypesHolder> TYPES_HOLDER_CLASS =
+ ClassConfOption.create("giraph.typesHolder", null,
+ TypesHolder.class);
/** Vertex value factory class - optional */
ClassConfOption<VertexValueFactory> VERTEX_VALUE_FACTORY_CLASS =
ClassConfOption.create("giraph.vertexValueFactoryClass",
@@ -97,6 +108,11 @@ public interface GiraphConstants {
ClassConfOption.create("giraph.vertexResolverClass",
DefaultVertexResolver.class, VertexResolver.class);
+ /** Which language computation is implemented in */
+ EnumConfOption<Language> COMPUTATION_LANGUAGE =
+ EnumConfOption.create("giraph.computation.language",
+ Language.class, Language.JAVA);
+
/**
* Option of whether to create vertexes that were not existent before but
* received messages
@@ -142,7 +158,7 @@ public interface GiraphConstants {
* application, saveVertex will be called right after each vertex.compute()
* is called.
* NOTE: This feature doesn't work well with checkpointing - if you restart
- * from a checkpoint you won't have any ouptut from previous supresteps.
+ * from a checkpoint you won't have any output from previous supersteps.
*/
BooleanConfOption DO_OUTPUT_DURING_COMPUTATION =
new BooleanConfOption("giraph.doOutputDuringComputation", false);
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
new file mode 100644
index 0000000..0727270
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphTypes.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.base.Preconditions;
+
+import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
+import static org.apache.giraph.utils.ConfigurationUtils.getTypesHolderClass;
+import static org.apache.giraph.utils.ReflectionUtils.getTypeArguments;
+
+/**
+ * Holder for the generic types that describe user's graph.
+ *
+ * @param <I> Vertex ID class
+ * @param <V> Vertex Value class
+ * @param <E> Edge class
+ */
+public class GiraphTypes<I extends WritableComparable, V extends Writable,
+ E extends Writable> {
+ /** Vertex id class */
+ private Class<I> vertexIdClass;
+ /** Vertex value class */
+ private Class<V> vertexValueClass;
+ /** Edge value class */
+ private Class<E> edgeValueClass;
+ /** Incoming message value class */
+ private Class<? extends Writable> incomingMessageValueClass;
+ /** Outgoing message value class */
+ private Class<? extends Writable> outgoingMessageValueClass;
+
+ /**
+ * Empty Constructor
+ */
+ public GiraphTypes() { }
+
+ /**
+ * Constructor taking values
+ *
+ * @param vertexIdClass vertex id class
+ * @param vertexValueClass vertex value class
+ * @param edgeValueClass edge value class
+ * @param incomingMessageValueClass incoming message class
+ * @param outgoingMessageValueClass outgoing message class
+ */
+ public GiraphTypes(Class<I> vertexIdClass,
+ Class<V> vertexValueClass,
+ Class<E> edgeValueClass,
+ Class<? extends Writable> incomingMessageValueClass,
+ Class<? extends Writable> outgoingMessageValueClass) {
+ this.edgeValueClass = edgeValueClass;
+ this.incomingMessageValueClass = incomingMessageValueClass;
+ this.outgoingMessageValueClass = outgoingMessageValueClass;
+ this.vertexIdClass = vertexIdClass;
+ this.vertexValueClass = vertexValueClass;
+ }
+
+ /**
+ * Read types from a {@link Configuration}.
+ * First tries to read them directly from the configuration options.
+ * If that doesn't work, tries to infer from {@link TypesHolder}.
+ *
+ * @param conf Configuration
+ * @param <IX> vertex id
+ * @param <VX> vertex value
+ * @param <EX> edge value
+ * @return GiraphTypes
+ */
+ public static <IX extends WritableComparable, VX extends Writable,
+ EX extends Writable> GiraphTypes<IX, VX, EX> readFrom(
+ Configuration conf) {
+ GiraphTypes<IX, VX, EX> types = new GiraphTypes<IX, VX, EX>();
+ types.readDirect(conf);
+ if (!types.hasData()) {
+ Class<? extends TypesHolder> klass = getTypesHolderClass(conf);
+ if (klass != null) {
+ types.inferFrom(klass);
+ }
+ }
+ return types;
+ }
+
+ /**
+ * Infer types from Computation class
+ *
+ * @param klass Computation class
+ */
+ public void inferFrom(Class<? extends TypesHolder> klass) {
+ Class<?>[] classList = getTypeArguments(TypesHolder.class, klass);
+ Preconditions.checkArgument(classList.length == 5);
+ vertexIdClass = (Class<I>) classList[0];
+ vertexValueClass = (Class<V>) classList[1];
+ edgeValueClass = (Class<E>) classList[2];
+ incomingMessageValueClass = (Class<? extends Writable>) classList[3];
+ outgoingMessageValueClass = (Class<? extends Writable>) classList[4];
+ }
+
+ /**
+ * Read types directly from Configuration
+ *
+ * @param conf Configuration
+ */
+ private void readDirect(Configuration conf) {
+ vertexIdClass = (Class<I>) VERTEX_ID_CLASS.get(conf);
+ vertexValueClass = (Class<V>) VERTEX_VALUE_CLASS.get(conf);
+ edgeValueClass = (Class<E>) EDGE_VALUE_CLASS.get(conf);
+ incomingMessageValueClass = INCOMING_MESSAGE_VALUE_CLASS.get(conf);
+ outgoingMessageValueClass = OUTGOING_MESSAGE_VALUE_CLASS.get(conf);
+ }
+
+ /**
+ * Check if types are set
+ *
+ * @return true if types are set
+ */
+ public boolean hasData() {
+ return vertexIdClass != null &&
+ vertexValueClass != null &&
+ edgeValueClass != null &&
+ incomingMessageValueClass != null &&
+ outgoingMessageValueClass != null;
+ }
+
+ /**
+ * Write types to Configuration
+ *
+ * @param conf Configuration
+ */
+ public void writeTo(Configuration conf) {
+ VERTEX_ID_CLASS.set(conf, vertexIdClass);
+ VERTEX_VALUE_CLASS.set(conf, vertexValueClass);
+ EDGE_VALUE_CLASS.set(conf, edgeValueClass);
+ INCOMING_MESSAGE_VALUE_CLASS.set(conf, incomingMessageValueClass);
+ OUTGOING_MESSAGE_VALUE_CLASS.set(conf, outgoingMessageValueClass);
+ }
+
+ /**
+ * Write types to Configuration if not already set
+ *
+ * @param conf Configuration
+ */
+ public void writeIfUnset(Configuration conf) {
+ VERTEX_ID_CLASS.setIfUnset(conf, vertexIdClass);
+ VERTEX_VALUE_CLASS.setIfUnset(conf, vertexValueClass);
+ EDGE_VALUE_CLASS.setIfUnset(conf, edgeValueClass);
+ INCOMING_MESSAGE_VALUE_CLASS.setIfUnset(conf, incomingMessageValueClass);
+ OUTGOING_MESSAGE_VALUE_CLASS.setIfUnset(conf, outgoingMessageValueClass);
+ }
+
+ public Class<E> getEdgeValueClass() {
+ return edgeValueClass;
+ }
+
+ public Class<? extends Writable> getIncomingMessageValueClass() {
+ return incomingMessageValueClass;
+ }
+
+ public Class<? extends Writable> getOutgoingMessageValueClass() {
+ return outgoingMessageValueClass;
+ }
+
+ public Class<I> getVertexIdClass() {
+ return vertexIdClass;
+ }
+
+ public Class<V> getVertexValueClass() {
+ return vertexValueClass;
+ }
+
+ public void setEdgeValueClass(Class<E> edgeValueClass) {
+ this.edgeValueClass = edgeValueClass;
+ }
+
+ public void setIncomingMessageValueClass(
+ Class<? extends Writable> incomingMessageValueClass) {
+ this.incomingMessageValueClass = incomingMessageValueClass;
+ }
+
+ public void setOutgoingMessageValueClass(
+ Class<? extends Writable> outgoingMessageValueClass) {
+ this.outgoingMessageValueClass = outgoingMessageValueClass;
+ }
+
+ public void setVertexIdClass(Class<I> vertexIdClass) {
+ this.vertexIdClass = vertexIdClass;
+ }
+
+ public void setVertexValueClass(Class<V> vertexValueClass) {
+ this.vertexValueClass = vertexValueClass;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index dbd2452..64d0fb2 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.graph.ComputationFactory;
import org.apache.giraph.graph.DefaultVertex;
import org.apache.giraph.graph.Computation;
import org.apache.giraph.graph.Vertex;
@@ -114,6 +115,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
/**
* Configure an object with this instance if the object is configurable.
+ *
* @param obj Object
*/
public void configureIfPossible(Object obj) {
@@ -134,6 +136,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
/**
* Get the edge input filter to use
+ *
* @return EdgeInputFilter
*/
public EdgeInputFilter getEdgeInputFilter() {
@@ -152,6 +155,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
/**
* Get the vertex input filter to use
+ *
* @return VertexInputFilter
*/
public VertexInputFilter getVertexInputFilter() {
@@ -450,13 +454,44 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get computation factory class
+ *
+ * @return computation factory class
+ */
+ @Override
+ public Class<? extends ComputationFactory<I, V, E,
+ ? extends Writable, ? extends Writable>>
+ getComputationFactoryClass() {
+ return classes.getComputationFactoryClass();
+ }
+
+ /**
+ * Get computation factory
+ *
+ * @return computation factory
+ */
+ public ComputationFactory<I, V, E, ? extends Writable, ? extends Writable>
+ createComputationFactory() {
+ return ReflectionUtils.newInstance(getComputationFactoryClass(), this);
+ }
+
+ /**
* Create a user computation
*
* @return Instantiated user computation
*/
public Computation<I, V, E, ? extends Writable, ? extends Writable>
createComputation() {
- return ReflectionUtils.newInstance(getComputationClass(), this);
+ return createComputationFactory().getComputation(this);
+ }
+
+ /**
+ * Get user types describing graph (I,V,E,M1,M2)
+ *
+ * @return GiraphTypes
+ */
+ public GiraphTypes<I, V, E> getGiraphTypes() {
+ return classes.getGiraphTypes();
}
/**
@@ -554,6 +589,7 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
/**
* Create job observer
+ *
* @return GiraphJobObserver set in configuration.
*/
public GiraphJobObserver getJobObserver() {
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
index de75e9d..142c090 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/IntConfOption.java
@@ -28,6 +28,7 @@ public class IntConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key key
* @param defaultValue default value
*/
@@ -39,6 +40,7 @@ public class IntConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key key
* @param defaultValue default value
*/
@@ -52,6 +54,10 @@ public class IntConfOption extends AbstractConfOption {
return defaultValue;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return get(conf) == defaultValue;
+ }
+
@Override public String getDefaultValueStr() {
return Integer.toString(defaultValue);
}
@@ -62,6 +68,7 @@ public class IntConfOption extends AbstractConfOption {
/**
* Lookup value
+ *
* @param conf Configuration
* @return value for key, or default value if not set
*/
@@ -71,6 +78,7 @@ public class IntConfOption extends AbstractConfOption {
/**
* Set value
+ *
* @param conf Configuration
* @param value to set
*/
@@ -80,11 +88,12 @@ public class IntConfOption extends AbstractConfOption {
/**
* Set value if it's not already present
+ *
* @param conf Configuration
* @param value to set
*/
public void setIfUnset(Configuration conf, int value) {
- if (conf.get(getKey()) == null) {
+ if (!contains(conf)) {
conf.setInt(getKey(), value);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
index 0cbc164..a5c939d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/LongConfOption.java
@@ -28,6 +28,7 @@ public class LongConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key key
* @param defaultValue default value
*/
@@ -41,6 +42,10 @@ public class LongConfOption extends AbstractConfOption {
return defaultValue;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return get(conf) == defaultValue;
+ }
+
@Override public String getDefaultValueStr() {
return Long.toString(defaultValue);
}
@@ -51,6 +56,7 @@ public class LongConfOption extends AbstractConfOption {
/**
* Lookup value
+ *
* @param conf Configuration
* @return value set for key, or defaultValue
*/
@@ -59,7 +65,19 @@ public class LongConfOption extends AbstractConfOption {
}
/**
+ * Lookup value, use passed in default value if not found.
+ *
+ * @param conf Configuration
+ * @param val default value to use
+ * @return set for key, or default value passed in
+ */
+ public long getWithDefault(Configuration conf, long val) {
+ return conf.getLong(getKey(), val);
+ }
+
+ /**
* Set value for key
+ *
* @param conf Configuration
* @param value to set
*/
@@ -69,11 +87,12 @@ public class LongConfOption extends AbstractConfOption {
/**
* Set value if it's not already present
+ *
* @param conf Configuration
* @param value to set
*/
public void setIfUnset(Configuration conf, long value) {
- if (conf.get(getKey()) == null) {
+ if (!contains(conf)) {
conf.setLong(getKey(), value);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
index 83a583d..7c3a993 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/StrConfOption.java
@@ -19,6 +19,7 @@ package org.apache.giraph.conf;
import org.apache.hadoop.conf.Configuration;
+import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import java.util.List;
@@ -32,6 +33,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Constructor
+ *
* @param key key
* @param defaultValue default value
*/
@@ -45,6 +47,10 @@ public class StrConfOption extends AbstractConfOption {
return defaultValue;
}
+ @Override public boolean isDefaultValue(Configuration conf) {
+ return Objects.equal(get(conf), defaultValue);
+ }
+
@Override public String getDefaultValueStr() {
return defaultValue;
}
@@ -55,6 +61,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Lookup value
+ *
* @param conf Configuration
* @return value for key, or defaultValue
*/
@@ -64,6 +71,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Lookup value with user defined defaultValue
+ *
* @param conf Configuration
* @param defaultVal default value to use
* @return value for key, or defaultVal passed in
@@ -74,6 +82,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Get array of values for key
+ *
* @param conf Configuration
* @return array of values for key
*/
@@ -83,6 +92,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Get list of values for key
+ *
* @param conf Configuration
* @return list of values for key
*/
@@ -92,6 +102,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Set value for key
+ *
* @param conf Configuration
* @param value to set
*/
@@ -101,6 +112,7 @@ public class StrConfOption extends AbstractConfOption {
/**
* Set value if not already present
+ *
* @param conf Configuration
* @param value to set
*/
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java b/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
new file mode 100644
index 0000000..0deb79f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/TypesHolder.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.conf;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Interface for classes that are parameterized by all of the Giraph types.
+ * These classes can be used to infer the types used by {@link GiraphTypes}
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M1> Incoming Message Value
+ * @param <M2> Outgoing Message Value
+ */
+public interface TypesHolder<I extends WritableComparable,
+ V extends Writable, E extends Writable, M1 extends Writable,
+ M2 extends Writable> {
+ // Nothing here
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
index 84158df..87d5879 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Computation.java
@@ -20,6 +20,7 @@ package org.apache.giraph.graph;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.TypesHolder;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.OutEdges;
import org.apache.giraph.worker.WorkerAggregatorUsage;
@@ -27,6 +28,7 @@ import org.apache.giraph.worker.WorkerContext;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
import java.io.IOException;
@@ -53,7 +55,10 @@ public abstract class Computation<I extends WritableComparable,
V extends Writable, E extends Writable, M1 extends Writable,
M2 extends Writable>
extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements WorkerAggregatorUsage {
+ implements TypesHolder<I, V, E, M1, M2>, WorkerAggregatorUsage {
+ /** Logger */
+ private static final Logger LOG = Logger.getLogger(Computation.class);
+
/** Global graph state **/
private GraphState graphState;
/** Handles requests */
@@ -101,7 +106,7 @@ public abstract class Computation<I extends WritableComparable,
* @param workerAggregatorUsage Worker aggregator usage
* @param workerContext Worker context
*/
- public final void initialize(
+ public void initialize(
GraphState graphState,
WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor,
GraphTaskManager<I, V, E> graphTaskManager,
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
new file mode 100644
index 0000000..d23db05
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputationFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Factory for creating Computations
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ * @param <M1> Incoming Message Value
+ * @param <M2> Outgoing Message Value
+ */
+public interface ComputationFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable, M1 extends Writable,
+ M2 extends Writable> {
+ /**
+ * One time initialization before compute calls.
+ * Guaranteed to be called from only one thread before computation begins.
+ *
+ * @param conf Configuration
+ */
+ void initComputation(ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+ /**
+ * Get Computation object
+ *
+ * @param conf Configuration
+ * @return Computation
+ */
+ Computation<I, V, E, M1, M2> getComputation(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+ /**
+ * Check that the Configuration passed in is setup correctly to run a job.
+ *
+ * @param conf Configuration to check.
+ */
+ void checkConfiguration(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf);
+
+ /**
+ * Get name of this particular computation
+ *
+ * @param conf Configuration
+ * @return String name of computation
+ */
+ String computationName(GiraphConfiguration conf);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
new file mode 100644
index 0000000..405b272
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultComputationFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Default computation factory that simply creates java computation object
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class DefaultComputationFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements ComputationFactory<I, V, E, Writable, Writable> {
+ @Override
+ public void initComputation(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ // Nothing to do here
+ }
+
+ @Override
+ public Computation<I, V, E, Writable, Writable> getComputation(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ Class<? extends Computation> klass = conf.getComputationClass();
+ return ReflectionUtils.newInstance(klass, conf);
+ }
+
+ @Override
+ public void checkConfiguration(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ if (conf.getComputationClass() == null) {
+ throw new IllegalArgumentException("checkConfiguration: Null " +
+ GiraphConstants.COMPUTATION_CLASS.getKey());
+ }
+ }
+
+ @Override
+ public String computationName(GiraphConfiguration conf) {
+ return conf.getComputationClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
index 99b28df..435dd87 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java
@@ -40,7 +40,6 @@ import org.apache.giraph.time.Time;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
-import org.apache.giraph.utils.ReflectionUtils;
import org.apache.giraph.worker.BspServiceWorker;
import org.apache.giraph.worker.InputSplitsCallable;
import org.apache.giraph.worker.WorkerContext;
@@ -58,7 +57,6 @@ import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import java.io.IOException;
-import java.lang.reflect.Type;
import java.net.URL;
import java.net.URLDecoder;
import java.util.ArrayList;
@@ -70,12 +68,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
-import static org.apache.giraph.conf.GiraphConstants.EDGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.INCOMING_MESSAGE_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_ID_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_VALUE_CLASS;
-import static org.apache.giraph.conf.GiraphConstants.OUTGOING_MESSAGE_VALUE_CLASS;
-
/**
* The Giraph-specific business logic for a single BSP
* compute node in whatever underlying type of cluster
@@ -175,13 +167,18 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
public void setup(Path[] zkPathList)
throws IOException, InterruptedException {
context.setStatus("setup: Beginning worker setup.");
- conf = new ImmutableClassesGiraphConfiguration<I, V, E>(
- context.getConfiguration());
- determineClassTypes(conf);
+ Configuration hadoopConf = context.getConfiguration();
+ conf = new ImmutableClassesGiraphConfiguration<I, V, E>(hadoopConf);
+ // Write user's graph types (I,V,E,M) back to configuration parameters so
+ // that they are set for quicker access later. These types are often
+ // inferred from the Computation class used.
+ conf.getGiraphTypes().writeIfUnset(conf);
// configure global logging level for Giraph job
initializeAndConfigureLogging();
// init the metrics objects
setupAndInitializeGiraphMetrics();
+ // One time setup for computation factory
+ conf.createComputationFactory().initComputation(conf);
// Do some task setup (possibly starting up a Zookeeper service)
context.setStatus("setup: Initializing Zookeeper services.");
locateZookeeperClasspath(zkPathList);
@@ -437,32 +434,6 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable,
return serviceWorker.getWorkerContext();
}
- /**
- * Set the concrete, user-defined choices about generic methods
- * (validated earlier in GiraphRunner) into the Configuration.
- * @param conf the Configuration object for this job run.
- */
- public void determineClassTypes(Configuration conf) {
- ImmutableClassesGiraphConfiguration giraphConf =
- new ImmutableClassesGiraphConfiguration(conf);
- Class<? extends Computation<I, V, E, Writable, Writable>> computationClass =
- giraphConf.getComputationClass();
- List<Class<?>> classList = ReflectionUtils.<Computation>getTypeArguments(
- Computation.class, computationClass);
- Type vertexIndexType = classList.get(0);
- Type vertexValueType = classList.get(1);
- Type edgeValueType = classList.get(2);
- Type incomingMessageValueType = classList.get(3);
- Type outgoingMessageValueType = classList.get(4);
- VERTEX_ID_CLASS.set(conf, (Class<WritableComparable>) vertexIndexType);
- VERTEX_VALUE_CLASS.set(conf, (Class<Writable>) vertexValueType);
- EDGE_VALUE_CLASS.set(conf, (Class<Writable>) edgeValueType);
- INCOMING_MESSAGE_VALUE_CLASS.set(conf,
- (Class<Writable>) incomingMessageValueType);
- OUTGOING_MESSAGE_VALUE_CLASS.set(conf,
- (Class<Writable>) outgoingMessageValueType);
- }
-
/**
* Copied from JobConf to get the location of this jar. Workaround for
* things like Oozie map-reduce jobs. NOTE: Pure YARN profile cannot
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Language.java b/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
new file mode 100644
index 0000000..29e0a84
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Language.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+/**
+ * Programming language that something is implemented in
+ */
+public enum Language {
+ /** java */
+ JAVA,
+ /** jython */
+ JYTHON
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
index 91725f8..b15f222 100644
--- a/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/IntIntNullTextInputFormat.java
@@ -18,7 +18,6 @@
package org.apache.giraph.io.formats;
-import com.google.common.collect.Lists;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.hadoop.io.IntWritable;
@@ -27,6 +26,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import com.google.common.collect.Lists;
+
import java.io.IOException;
import java.util.List;
import java.util.regex.Pattern;
@@ -46,13 +47,13 @@ public class IntIntNullTextInputFormat extends
public TextVertexReader createVertexReader(InputSplit split,
TaskAttemptContext context)
throws IOException {
- return new IntIntNullIntVertexReader();
+ return new IntIntNullVertexReader();
}
/**
* Vertex reader associated with {@link IntIntNullTextInputFormat}.
*/
- public class IntIntNullIntVertexReader extends
+ public class IntIntNullVertexReader extends
TextVertexReaderFromEachLineProcessed<String[]> {
/**
* Cached vertex id for the current line
http://git-wip-us.apache.org/repos/asf/giraph/blob/8f89bd85/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
new file mode 100644
index 0000000..4d47862
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/LongLongNullTextInputFormat.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.EdgeFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Simple text-based {@link org.apache.giraph.io.VertexInputFormat} for
+ * unweighted graphs with long ids.
+ *
+ * Each line consists of: vertex neighbor1 neighbor2 ...
+ */
+public class LongLongNullTextInputFormat extends
+ TextVertexInputFormat<LongWritable, LongWritable, NullWritable> {
+ /** Separator of the vertex and neighbors */
+ private static final Pattern SEPARATOR = Pattern.compile("[\t ]");
+
+ @Override
+ public TextVertexReader createVertexReader(InputSplit split,
+ TaskAttemptContext context)
+ throws IOException {
+ return new LongLongNullVertexReader();
+ }
+
+ /**
+ * Vertex reader associated with {@link LongLongNullLongTextInputFormat}.
+ */
+ public class LongLongNullVertexReader extends
+ TextVertexReaderFromEachLineProcessed<String[]> {
+ /** Cached vertex id for the current line */
+ private LongWritable id;
+
+ @Override
+ protected String[] preprocessLine(Text line) throws IOException {
+ String[] tokens = SEPARATOR.split(line.toString());
+ id = new LongWritable(Long.parseLong(tokens[0]));
+ return tokens;
+ }
+
+ @Override
+ protected LongWritable getId(String[] tokens) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected LongWritable getValue(String[] tokens) throws IOException {
+ return id;
+ }
+
+ @Override
+ protected Iterable<Edge<LongWritable, NullWritable>> getEdges(
+ String[] tokens) throws IOException {
+ List<Edge<LongWritable, NullWritable>> edges =
+ Lists.newArrayListWithCapacity(tokens.length - 1);
+ for (int n = 1; n < tokens.length; n++) {
+ edges.add(EdgeFactory.create(
+ new LongWritable(Long.parseLong(tokens[n]))));
+ }
+ return edges;
+ }
+ }
+}