You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/29 19:26:23 UTC
git commit: updated refs/heads/trunk to 9c47b67
Updated Branches:
refs/heads/trunk a68f2bada -> 9c47b670a
GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9c47b670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9c47b670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9c47b670
Branch: refs/heads/trunk
Commit: 9c47b670a1972cb64e3a1d92ad376e6f0db10b1f
Parents: a68f2ba
Author: Alessandro Presta <al...@fb.com>
Authored: Fri Mar 29 11:26:01 2013 -0700
Committer: Alessandro Presta <al...@fb.com>
Committed: Fri Mar 29 11:26:01 2013 -0700
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/utils/InternalVertexRunner.java | 111 +++++++++++++++
2 files changed, 113 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4e2036f..ee54f00 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)
+
GIRAPH-593: Update Hive IO performance improvements (nitay)
GIRAPH-594: auto set reusing objects (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 4b03127..e389e01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -24,6 +24,8 @@ import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.zookeeper.server.ServerConfig;
@@ -218,6 +220,115 @@ public class InternalVertexRunner {
}
/**
+ * Attempts to run the vertex internally in the current JVM, reading and
+ * writing to an in-memory graph. Will start its own zookeeper
+ * instance.
+ * @param <I> The vertex index type
+ * @param <V> The vertex type
+ * @param <E> The edge type
+ * @param <M> The message type
+ * @param classes GiraphClasses specifying which types to use
+ * @param params a map of parameters to add to the hadoop configuration
+ * @param graph input graph
+ * @return iterable output data
+ * @throws Exception if anything goes wrong
+ */
+ public static <I extends WritableComparable,
+ V extends Writable,
+ E extends Writable,
+ M extends Writable> TestGraph<I, V, E, M> run(
+ GiraphClasses<I, V, E, M> classes,
+ Map<String, String> params,
+ TestGraph<I, V, E, M> graph) throws Exception {
+ File tmpDir = null;
+ try {
+ // Prepare temporary folders
+ tmpDir = FileUtils.createTestDir(classes.getVertexClass());
+
+ File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+ File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+ File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+
+ // Create and configure the job to run the vertex
+ GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
+
+ InMemoryVertexInputFormat.setGraph(graph);
+
+ GiraphConfiguration conf = job.getConfiguration();
+ conf.setVertexClass(classes.getVertexClass());
+ conf.setVertexEdgesClass(classes.getVertexEdgesClass());
+ conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
+ conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
+ conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
+ if (classes.hasWorkerContextClass()) {
+ conf.setWorkerContextClass(classes.getWorkerContextClass());
+ }
+ if (classes.hasCombinerClass()) {
+ conf.setVertexCombinerClass(classes.getCombinerClass());
+ }
+ if (classes.hasMasterComputeClass()) {
+ conf.setMasterComputeClass(classes.getMasterComputeClass());
+ }
+ if (classes.hasVertexInputFormat()) {
+ conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
+ }
+ if (classes.hasEdgeInputFormat()) {
+ conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
+ }
+ if (classes.hasVertexOutputFormat()) {
+ conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
+ }
+
+ conf.setWorkerConfiguration(1, 1, 100.0f);
+ GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+ GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+ conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
+ String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+ conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+ GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+ zkMgrDir.toString());
+ GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+
+ for (Map.Entry<String, String> param : params.entrySet()) {
+ conf.set(param.getKey(), param.getValue());
+ }
+
+ // Configure a local zookeeper instance
+ Properties zkProperties = configLocalZooKeeper(zkDir);
+
+ QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+ qpConfig.parseProperties(zkProperties);
+
+ // Create and run the zookeeper instance
+ final InternalZooKeeper zookeeper = new InternalZooKeeper();
+ final ServerConfig zkConfig = new ServerConfig();
+ zkConfig.readFrom(qpConfig);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+ executorService.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ zookeeper.runFromConfig(zkConfig);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ try {
+ job.run(true);
+ } finally {
+ executorService.shutdown();
+ zookeeper.end();
+ }
+ return graph;
+ } finally {
+ FileUtils.delete(tmpDir);
+ }
+ }
+
+ /**
* Configuration options for running local ZK.
*
* @param zkDir directory for ZK to hold files in.