You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/06/15 19:38:54 UTC

svn commit: r1350715 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/main/java/org/apache/hama/examples/util/ examples/src/test/java/org/apache/hama/examples/

Author: tjungblut
Date: Fri Jun 15 17:38:54 2012
New Revision: 1350715

URL: http://svn.apache.org/viewvc?rev=1350715&view=rev
Log:
[HAMA-550]: Implementation of Bipartite Matching


Added:
    hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1350715&r1=1350714&r2=1350715&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jun 15 17:38:54 2012
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES
-
+ 
+   HAMA-550: Implementation of Bipartite Matching (Apurv Verma via tjungblut)
    HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
    HAMA-566: Add disk-based queue (tjungblut)
    HAMA-552: Add a sorted message queue (tjungblut)   

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.examples.util.TextPair;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+import com.google.common.base.Objects;
+
+/**
+ * Randomized Matching Algorithm for bipartite matching in the Pregel Model.
+ * 
+ */
+public final class BipartiteMatching {
+
+  private static final Text UNMATCHED = new Text("U");
+  public static final String SEED_CONFIGURATION_KEY = "bipartite.matching.seed";
+
+  public static class BipartiteMatchingVertex extends
+      Vertex<Text, NullWritable, TextPair> {
+
+    // Components
+    private final static Text LEFT = new Text("L");
+    private final static Text RIGHT = new Text("R");
+
+    // Needed because Vertex value and message sent have same types.
+    private TextPair reusableMessage;
+    private Random random;
+
+    @Override
+    public void setup(Configuration conf) {
+      this.getPeer().getNumCurrentMessages();
+      reusableMessage = new TextPair(new Text(getVertexID()), new Text("1"))
+          .setNames("SourceVertex", "Vestige");
+      random = new Random(Long.parseLong(getConf().get(SEED_CONFIGURATION_KEY,
+          System.currentTimeMillis() + "")));
+    }
+
+    @Override
+    public void compute(Iterator<TextPair> messages) throws IOException {
+
+      if (isMatched()) {
+        voteToHalt();
+        return;
+      }
+
+      switch ((int) getSuperstepCount() % 4) {
+        case 0:
+          if (Objects.equal(getComponent(), LEFT)) {
+            sendMessageToNeighbors(getNewMessage());
+          }
+          break;
+
+        case 1:
+          if (Objects.equal(getComponent(), RIGHT)) {
+            List<TextPair> buffer = new ArrayList<TextPair>();
+            while (messages.hasNext()) {
+              buffer.add(messages.next());
+            }
+            if (buffer.size() > 0) {
+              TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
+                  buffer.size()));
+
+              Text sourceVertex = getSourceVertex(luckyMsg);
+              sendMessage(sourceVertex, getNewMessage());
+            }
+          }
+          break;
+
+        case 2:
+          if (Objects.equal(getComponent(), LEFT)) {
+            List<TextPair> buffer = new ArrayList<TextPair>();
+
+            while (messages.hasNext()) {
+              buffer.add(messages.next());
+            }
+            if (buffer.size() > 0) {
+              TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
+                  buffer.size()));
+
+              Text sourceVertex = getSourceVertex(luckyMsg);
+              setMatchVertex(sourceVertex);
+              sendMessage(sourceVertex, getNewMessage());
+            }
+          }
+          break;
+
+        case 3:
+          if (Objects.equal(getComponent(), RIGHT)) {
+            if (messages.hasNext()) {
+              Text sourceVertex = getSourceVertex(messages.next());
+              setMatchVertex(sourceVertex);
+            }
+          }
+          break;
+      }
+    }
+
+    /**
+     * Finds the vertex from which "msg" came.
+     */
+    private static Text getSourceVertex(TextPair msg) {
+      return msg.getFirst();
+    }
+
+    /**
+     * Pairs "this" vertex with the "matchVertex"
+     */
+    private void setMatchVertex(Text matchVertex) {
+      getValue().setFirst(matchVertex);
+    }
+
+    private TextPair getNewMessage() {
+      return reusableMessage;
+    }
+
+    /**
+     * Returns the component{LEFT/RIGHT} to which this vertex belongs.
+     */
+    private Text getComponent() {
+      return getValue().getSecond();
+    }
+
+    private boolean isMatched() {
+      return !getValue().getFirst().equals(UNMATCHED);
+    }
+
+  }
+
+  /**
+   * 
+   * Input graph is given as<br/>
+   * <Vertex> <component value>: <adjacent_vertex_1> <adjacent_vertex_2> ..<br/>
+   * A L:B D<br/>
+   * B R:A C<br/>
+   * C L:B D<br/>
+   * D R:A C<br/>
+   */
+  public static class BipartiteMatchingVertexReader extends
+      VertexInputReader<LongWritable, Text, Text, NullWritable, TextPair> {
+
+    @Override
+    public boolean parseVertex(LongWritable key, Text value,
+        Vertex<Text, NullWritable, TextPair> vertex) {
+
+      String[] tokenArray = value.toString().split(":");
+      String[] adjArray = tokenArray[1].trim().split(" ");
+      String[] selfArray = tokenArray[0].trim().split(" ");
+
+      vertex.setVertexID(new Text(selfArray[0]));
+      vertex.setValue(new TextPair(UNMATCHED, new Text(selfArray[1])).setNames(
+          "MatchVertex", "Component"));
+      // initially a node is unmatched, which is denoted by U.
+
+      for (String adjNode : adjArray) {
+        vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjNode), null));
+      }
+      return true;
+    }
+  }
+
+  private static void printUsage() {
+    System.out.println("Usage: <input> <output> "
+        + "[maximum iterations (default 30)] [tasks] [seed]");
+    System.exit(-1);
+  }
+
+  public static void main(String... args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    if (args.length < 2) {
+      printUsage();
+    }
+
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    GraphJob job = new GraphJob(conf, BipartiteMatching.class);
+
+    // set the defaults
+    job.setMaxIteration(30);
+    job.setNumBspTask(2);
+    conf.set(SEED_CONFIGURATION_KEY, System.currentTimeMillis() + "");
+
+    if (args.length == 5)
+      conf.set(SEED_CONFIGURATION_KEY, args[4]);
+    if (args.length >= 4)
+      job.setNumBspTask(Integer.parseInt(args[3]));
+    if (args.length >= 3)
+      job.setMaxIteration(Integer.parseInt(args[2]));
+
+    job.setJobName("BipartiteMatching");
+    job.setInputPath(new Path(args[0]));
+    job.setOutputPath(new Path(args[1]));
+
+    job.setVertexClass(BipartiteMatchingVertex.class);
+    job.setVertexIDClass(Text.class);
+    job.setVertexValueClass(TextPair.class);
+    job.setEdgeValueClass(NullWritable.class);
+
+    job.setInputKeyClass(LongWritable.class);
+    job.setInputValueClass(Text.class);
+    job.setInputFormat(TextInputFormat.class);
+    job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
+    job.setPartitioner(HashPartitioner.class);
+    job.setOutputFormat(TextOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(TextPair.class);
+
+    long startTime = System.currentTimeMillis();
+    if (job.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+
+}

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1350715&r1=1350714&r2=1350715&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Fri Jun 15 17:38:54 2012
@@ -34,6 +34,8 @@ public class ExampleDriver {
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
       pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.addClass("inlnkcount", InlinkCount.class, "InlinkCount");
+      pgd.addClass("bipartite", BipartiteMatching.class, 
+          "Bipartite Matching");
       pgd.driver(args);
     } catch (Throwable e) {
       e.printStackTrace();

Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * TextPair class for use in BipartiteMatching algorithm.
+ *
+ */
+public final class TextPair implements Writable{
+  
+  Text first;
+  Text second;
+  
+  String nameFirst = "First";
+  String nameSecond = "Second";
+  
+  public TextPair(){
+    first  = new Text();
+    second = new Text(); 
+  }
+  
+  public TextPair(Text first, Text second){
+    this.first  = first;
+    this.second = second;
+  }
+  
+  /**
+   * Sets the names of the attributes 
+   */
+  public TextPair setNames(String nameFirst, String nameSecond){
+    this.nameFirst = nameFirst;
+    this.nameSecond = nameSecond;
+    return this;
+  }
+  
+  
+
+  public Text getFirst() {
+    return first;
+  }
+
+  public void setFirst(Text first) {
+    this.first = first;
+  }
+
+  public Text getSecond() {
+    return second;
+  }
+
+  public void setSecond(Text second) {
+    this.second = second;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    first.write(out);
+    second.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    first.readFields(in);
+    second.readFields(in);
+  }
+  
+  @Override
+  public String toString(){
+    return Objects.toStringHelper(this)
+        .add(nameFirst, getFirst())
+        .add(nameSecond, getSecond())
+        .toString();
+  }
+
+
+}

Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class BipartiteMatchingTest extends TestCase{
+
+  private String[] input = {
+      "A L:B D",
+      "B R:A C",
+      "C L:B D",
+      "D R:A C"
+  };
+
+  private final static String DELIMETER = "\t";
+
+  @SuppressWarnings("serial")
+  private Map<String, String> output1 = new HashMap<String, String>()
+  {{
+    put("C", "TextPair{MatchVertex=D, Component=L}");
+    put("A", "TextPair{MatchVertex=B, Component=L}");
+    put("D", "TextPair{MatchVertex=C, Component=R}");
+    put("B", "TextPair{MatchVertex=A, Component=R}");
+  }};
+
+
+  private static String INPUT = "/tmp/graph.txt";
+  private static String OUTPUT = "/tmp/graph-bipartite";
+
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  private void generateTestData(){
+    FileWriter fout = null;
+    BufferedWriter bout = null;
+    PrintWriter pout = null;
+    try{
+      fout = new FileWriter(INPUT);
+      bout = new BufferedWriter(fout);
+      pout = new PrintWriter(bout);
+      for(String line:input){
+        pout.println(line);
+      }
+    }
+    catch(IOException e){
+      e.printStackTrace();
+    }
+    finally{      
+      try {
+        if(pout!=null){pout.close();}
+        if(bout!=null){bout.close();}
+        if(fout!=null){fout.close();}
+      } catch (IOException e) {
+        e.printStackTrace();
+      }      
+    }
+  }
+
+
+  private void verifyResult()throws IOException{
+    FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    Text key = new Text();
+    Text value = new Text();
+    for(FileStatus file:files){
+      if(file.getLen() > 0){
+        FSDataInputStream in = fs.open(file.getPath());        
+        BufferedReader bin = new BufferedReader(
+            new InputStreamReader(in));
+
+        String s = bin.readLine();
+        while(s!=null){
+          next(key, value, s);
+          String expValue = output1.get(key.toString());
+          assertEquals(expValue, value.toString());
+          System.out.println(key + " "+value);
+          s = bin.readLine();
+        }        
+        in.close();
+      }
+    }
+  }
+
+  private static void next(Text key, Text value, String line){
+    String[] lineA = line.split(DELIMETER);
+    key.set(lineA[0]);
+    value.set(lineA[1]);
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(INPUT)))
+        fs.delete(new Path(INPUT), true);
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @Test
+  public void testBipartiteMatching() throws IOException, InterruptedException,
+  ClassNotFoundException{    
+    generateTestData();
+    try {
+      String seed = "2";
+      BipartiteMatching.main(new String[] { INPUT, OUTPUT, "30", "2",
+          seed});
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+
+}