You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by Apache Wiki <wi...@apache.org> on 2013/09/23 07:59:07 UTC

[Hama Wiki] Update of "DynamicGraphs" by AnastasisAndronidis

Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hama Wiki" for change notification.

The "DynamicGraphs" page has been changed by AnastasisAndronidis:
https://wiki.apache.org/hama/DynamicGraphs?action=diff&rev1=3&rev2=4

- == Dynamic Addition and Removal of Vertexes in Graph API ==
+ == Adding and Removing Vertices ==
  
- === Introduction ===
+ === Description ===
  Nowadays more and more people turn to distributed environments to store and analyze Big Data. For that reason there is a growing need in both achieving efficiency and new features. Hama is an upcoming project that gives to researchers and analysts a way to handle big amounts of data, through the BSP computation model.
   
  A very interesting feature of the Hama project is the Graph API for graph analysis. A lot of scientists and companies represent and manage their data with the use of one or more of the many different kinds of graphs (e.g. incremental graphs).
   
  This article will present some new features on the Graph API of Hama, on how to create a dynamic graphs on run time. It also will serve as a technical document of the internals of those features.
  
- === Description ===
- At the time being, we will discuss only two different operations (addition and deletion) from both points of the end-user and internal architecture.
+ === Introduction ===
+ At the time being, we will discuss only two different operations, addition and deletion. We will give an example on how to use this feature and later on we will describe the internal implementation.
  
  Some points that we need to keep in mind, are:
   1. Deletion/Addition is happening after a super step. ''We are providing methods inside a vertex instance that create/delete a vertex''
+  2. The vertex API is build on top of BSP peers. That means that each node of your cluster contains a specific number of BSP peers and therefor each BSP peer contains multiple vertices.
-  2. New vertexes need to be distributed through partitioner to be placed on right peers.
+  3. New vertexes need to be distributed through partitioner to be placed on right peers.
-  3. Keep in mind that a new created vertex and old vertexes, will have the same super step counter. ''Various algorithms change their behavior though time, an example with an internal counter to serve as a state is needed''
+  4. New and old vertexes, will have the same super step counter. ''Various algorithms change their behavior though time. If such a case, you need to develop your own state counter''
+ 
+ === User Example ===
+ ''The following code is part of the Graph examples. You can find it in org.apache.hama.examples.DynamicGraph''
+ 
+ This is an example of how to manipulate Graphs dynamically. The input of this example is a number in each row. We assume that the is a vertex with ID:1 which is responsible to create a sum vertex that will aggregate the values of the other vertices. During the aggregation, sum vertex will delete all other vertices. 
+ 
+ Input example:
+  * 1
+  * 2
+  * 3
+  * 4
+ 
+ Output example:
+  * sum  12 
+ 
+ (we also add the number of vertices that exist in the last superstep from two different methods)
+ 
+ 
+ {{{
+ public class DynamicGraph {
+   
+   public static class GraphTextReader extends 
+       VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+             Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+ 
+         vertex.setVertexID(value);
+         vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+ 
+         return true;
+     }
+   }
+ 
+   public static class GraphVertex extends 
+       Vertex<Text, NullWritable, IntWritable> {
+     
+     private void createSumVertex() throws IOException {
+       if (this.getVertexID().toString().equals("1")) {
+         Text new_id = new Text("sum");
+         this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new IntWritable(0));
+       }
+     }
+ 
+     private void sendAllValuesToSumAndRemove() throws IOException {
+       if (!this.getVertexID().toString().equals("sum")) {
+         this.sendMessage(new Text("sum"), this.getValue());
+         this.remove();
+       }
+     }
+ 
+     // this must run only on "sum" vertex
+     private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getVertexID().toString().equals("sum")) {
+         int s = 0;
+         for (IntWritable i : msgs) {
+           s += i.get();
+         }
+         s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+         s += this.getNumVertices();
+         this.setValue(new IntWritable(this.getValue().get() +s));
+       } else {
+         throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); 
+       }
+     }
+ 
+     @Override
+     public void compute(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getSuperstepCount() == 0) {
+         createSumVertex();
+       } else if (this.getSuperstepCount() == 1) {
+         sendAllValuesToSumAndRemove();
+       } else if (this.getSuperstepCount() == 2) {
+         calculateSum(msgs);
+       } else if (this.getSuperstepCount() == 3) {
+         this.voteToHalt();
+       }
+     }
+   }
+ 
+   public static void main(String[] args) throws IOException, 
+         InterruptedException, ClassNotFoundException {
+     if (args.length != 2) {
+       printUsage();
+     }
+     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     GraphJob graphJob = createJob(args, conf);
+     long startTime = System.currentTimeMillis();
+     if (graphJob.waitForCompletion(true)) {
+       System.out.println("Job Finished in "
+           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+     }
+   }
+ 
+   private static void printUsage() {
+     System.out.println("Usage: <input> <output>");
+     System.exit(-1);
+   }
+ 
+   private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException {
+     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+     graphJob.setJobName("Dynamic Graph");
+     graphJob.setVertexClass(GraphVertex.class);
+ 
+     graphJob.setInputPath(new Path(args[0]));
+     graphJob.setOutputPath(new Path(args[1]));
+ 
+     graphJob.setVertexIDClass(Text.class);
+     graphJob.setVertexValueClass(IntWritable.class);
+     graphJob.setEdgeValueClass(NullWritable.class);
+ 
+     graphJob.setInputFormat(TextInputFormat.class);
+ 
+     graphJob.setVertexInputReaderClass(GraphTextReader.class);
+     graphJob.setPartitioner(HashPartitioner.class);
+ 
+     graphJob.setOutputFormat(TextOutputFormat.class);
+     graphJob.setOutputKeyClass(Text.class);
+     graphJob.setOutputValueClass(IntWritable.class);
+ 
+     return graphJob;
+   }  
+ 
+ }
+ }}}
+ 
+ Starting from creating a class that will serve as a container of our example. In this case, the class is '''DynamicGraph'''.
+ 
+ '''DynamicGraph''' contains three important parts.
+ 
+ The first component is the declaration of the input reader. This class overrides the '''parseVertex''' method and creates vertex instances from an input file. In our case the input is a text file in which, each line has a number. This number is assigned as both the ID and value of the vertex.
+ 
+ {{{
+   public static class GraphTextReader extends 
+       VertexInputReader<LongWritable, Text, Text, NullWritable, IntWritable> {
+ 
+     @Override
+     public boolean parseVertex(LongWritable key, Text value,
+             Vertex<Text, NullWritable, IntWritable> vertex) throws Exception {
+ 
+         vertex.setVertexID(value);
+         vertex.setValue(new IntWritable(Integer.parseInt(value.toString())));
+ 
+         return true;
+     }
+   }
+ }}}
+ 
+ The second component is the standard boilerplate to create and submit a '''GraphJob'''.
+ 
+ {{{
+   public static void main(String[] args) throws IOException, 
+         InterruptedException, ClassNotFoundException {
+     if (args.length != 2) {
+       printUsage();
+     }
+     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+     GraphJob graphJob = createJob(args, conf);
+     long startTime = System.currentTimeMillis();
+     if (graphJob.waitForCompletion(true)) {
+       System.out.println("Job Finished in "
+           + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+     }
+   }
+ 
+   private static void printUsage() {
+     System.out.println("Usage: <input> <output>");
+     System.exit(-1);
+   }
+ 
+   private static GraphJob createJob(String[] args, HamaConfiguration conf) throws IOException {
+     GraphJob graphJob = new GraphJob(conf, DynamicGraph.class);
+     graphJob.setJobName("Dynamic Graph");
+     graphJob.setVertexClass(GraphVertex.class);
+ 
+     graphJob.setInputPath(new Path(args[0]));
+     graphJob.setOutputPath(new Path(args[1]));
+ 
+     graphJob.setVertexIDClass(Text.class);
+     graphJob.setVertexValueClass(IntWritable.class);
+     graphJob.setEdgeValueClass(NullWritable.class);
+ 
+     graphJob.setInputFormat(TextInputFormat.class);
+ 
+     graphJob.setVertexInputReaderClass(GraphTextReader.class);
+     graphJob.setPartitioner(HashPartitioner.class);
+ 
+     graphJob.setOutputFormat(TextOutputFormat.class);
+     graphJob.setOutputKeyClass(Text.class);
+     graphJob.setOutputValueClass(IntWritable.class);
+ 
+     return graphJob;
+   }
+ }}}
+ 
+ The most important component is the '''GraphVertex''' class. This class is the heart of the program as contains the compute method.
+ 
+ {{{
+   public static class GraphVertex extends 
+       Vertex<Text, NullWritable, IntWritable> {
+     
+     private void createSumVertex() throws IOException {
+       if (this.getVertexID().toString().equals("1")) {
+         Text new_id = new Text("sum");
+         this.addVertex(new_id, new ArrayList<Edge<Text, NullWritable>>(), new IntWritable(0));
+       }
+     }
+ 
+     private void sendAllValuesToSumAndRemove() throws IOException {
+       if (!this.getVertexID().toString().equals("sum")) {
+         this.sendMessage(new Text("sum"), this.getValue());
+         this.remove();
+       }
+     }
+ 
+     // this must run only on "sum" vertex
+     private void calculateSum(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getVertexID().toString().equals("sum")) {
+         int s = 0;
+         for (IntWritable i : msgs) {
+           s += i.get();
+         }
+         s += this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter();
+         s += this.getNumVertices();
+         this.setValue(new IntWritable(this.getValue().get() +s));
+       } else {
+         throw new UnsupportedOperationException("We have more vertecies than we expected: " + this.getVertexID() + " " + this.getValue()); 
+       }
+     }
+ 
+     @Override
+     public void compute(Iterable<IntWritable> msgs) throws IOException {
+       if (this.getSuperstepCount() == 0) {
+         createSumVertex();
+       } else if (this.getSuperstepCount() == 1) {
+         sendAllValuesToSumAndRemove();
+       } else if (this.getSuperstepCount() == 2) {
+         calculateSum(msgs);
+       } else if (this.getSuperstepCount() == 3) {
+         this.voteToHalt();
+       }
+     }
+   }
+ }}}
+ 
+ As we can see, the compute method is calling 3 other methods. In the '''createSumVertex''' method we can see the creation of a new vertex with ID the text ''sum'' and value 0. Later on, in '''sendAllValuesToSumAndRemove''' we can see that each vertex that runs this method is deleting itself by running ''this.remove();''. In the end, '''calculateSum''' is called that summarizes the values of all vertices in the ''sum'' vertex. The interesting part of the last method is that also adding the number of input vertices ''this.getPeer().getCounter(GraphJobCounter.INPUT_VERTICES).getCounter()'' and the current number of vertices that exist on the running superstep ''this.getNumVertices()''.
  
  === Implementation Details ===
  The `GraphJobRunner` class, contains the major steps for the graph computation. It starts with an initial setup, where the vertexes are loaded to memory, then there is the main computation loop, where the supersteps occur, and in the end there is the cleanup, where of the graph and the results are written to HDFS.