You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/03/29 19:26:23 UTC

git commit: updated refs/heads/trunk to 9c47b67

Updated Branches:
  refs/heads/trunk a68f2bada -> 9c47b670a


GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9c47b670
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9c47b670
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9c47b670

Branch: refs/heads/trunk
Commit: 9c47b670a1972cb64e3a1d92ad376e6f0db10b1f
Parents: a68f2ba
Author: Alessandro Presta <al...@fb.com>
Authored: Fri Mar 29 11:26:01 2013 -0700
Committer: Alessandro Presta <al...@fb.com>
Committed: Fri Mar 29 11:26:01 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../apache/giraph/utils/InternalVertexRunner.java  |  111 +++++++++++++++
 2 files changed, 113 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 4e2036f..ee54f00 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta)
+
   GIRAPH-593: Update Hive IO performance improvements (nitay)
 
   GIRAPH-594: auto set reusing objects (nitay)

http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
index 4b03127..e389e01 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java
@@ -24,6 +24,8 @@ import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.giraph.io.formats.GiraphFileInputFormat;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.zookeeper.server.ServerConfig;
@@ -218,6 +220,115 @@ public class InternalVertexRunner {
   }
 
   /**
+   * Attempts to run the vertex internally in the current JVM, reading and
+   * writing to an in-memory graph. Will start its own zookeeper
+   * instance.
+   * @param <I> The vertex index type
+   * @param <V> The vertex type
+   * @param <E> The edge type
+   * @param <M> The message type
+   * @param classes GiraphClasses specifying which types to use
+   * @param params a map of parameters to add to the hadoop configuration
+   * @param graph input graph
+   * @return iterable output data
+   * @throws Exception if anything goes wrong
+   */
+  public static <I extends WritableComparable,
+    V extends Writable,
+    E extends Writable,
+    M extends Writable> TestGraph<I, V, E, M> run(
+      GiraphClasses<I, V, E, M> classes,
+      Map<String, String> params,
+      TestGraph<I, V, E, M> graph) throws Exception {
+    File tmpDir = null;
+    try {
+      // Prepare temporary folders
+      tmpDir = FileUtils.createTestDir(classes.getVertexClass());
+
+      File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper");
+      File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir");
+      File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints");
+
+      // Create and configure the job to run the vertex
+      GiraphJob job = new GiraphJob(classes.getVertexClass().getName());
+
+      InMemoryVertexInputFormat.setGraph(graph);
+
+      GiraphConfiguration conf = job.getConfiguration();
+      conf.setVertexClass(classes.getVertexClass());
+      conf.setVertexEdgesClass(classes.getVertexEdgesClass());
+      conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
+      conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass());
+      conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass());
+      if (classes.hasWorkerContextClass()) {
+        conf.setWorkerContextClass(classes.getWorkerContextClass());
+      }
+      if (classes.hasCombinerClass()) {
+        conf.setVertexCombinerClass(classes.getCombinerClass());
+      }
+      if (classes.hasMasterComputeClass()) {
+        conf.setMasterComputeClass(classes.getMasterComputeClass());
+      }
+      if (classes.hasVertexInputFormat()) {
+        conf.setVertexInputFormatClass(classes.getVertexInputFormatClass());
+      }
+      if (classes.hasEdgeInputFormat()) {
+        conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass());
+      }
+      if (classes.hasVertexOutputFormat()) {
+        conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
+      }
+
+      conf.setWorkerConfiguration(1, 1, 100.0f);
+      GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false);
+      GiraphConstants.LOCAL_TEST_MODE.set(conf, true);
+      conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" +
+          String.valueOf(LOCAL_ZOOKEEPER_PORT));
+
+      conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString());
+      GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf,
+          zkMgrDir.toString());
+      GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString());
+
+      for (Map.Entry<String, String> param : params.entrySet()) {
+        conf.set(param.getKey(), param.getValue());
+      }
+
+      // Configure a local zookeeper instance
+      Properties zkProperties = configLocalZooKeeper(zkDir);
+
+      QuorumPeerConfig qpConfig = new QuorumPeerConfig();
+      qpConfig.parseProperties(zkProperties);
+
+      // Create and run the zookeeper instance
+      final InternalZooKeeper zookeeper = new InternalZooKeeper();
+      final ServerConfig zkConfig = new ServerConfig();
+      zkConfig.readFrom(qpConfig);
+
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      executorService.execute(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            zookeeper.runFromConfig(zkConfig);
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+      try {
+        job.run(true);
+      } finally {
+        executorService.shutdown();
+        zookeeper.end();
+      }
+      return graph;
+    } finally {
+      FileUtils.delete(tmpDir);
+    }
+  }
+
+  /**
    * Configuration options for running local ZK.
    *
    * @param zkDir directory for ZK to hold files in.