You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/06/15 19:38:54 UTC
svn commit: r1350715 - in /hama/trunk: ./
examples/src/main/java/org/apache/hama/examples/
examples/src/main/java/org/apache/hama/examples/util/
examples/src/test/java/org/apache/hama/examples/
Author: tjungblut
Date: Fri Jun 15 17:38:54 2012
New Revision: 1350715
URL: http://svn.apache.org/viewvc?rev=1350715&view=rev
Log:
[HAMA-550]: Implementation of Bipartite Matching
Added:
hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/
hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1350715&r1=1350714&r2=1350715&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Jun 15 17:38:54 2012
@@ -3,7 +3,8 @@ Hama Change Log
Release 0.5 - April 10, 2012
NEW FEATURES
-
+
+ HAMA-550: Implementation of Bipartite Matching (Apurv Verma via tjungblut)
HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
HAMA-566: Add disk-based queue (tjungblut)
HAMA-552: Add a sorted message queue (tjungblut)
Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.examples.util.TextPair;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+import com.google.common.base.Objects;
+
+/**
+ * Randomized Matching Algorithm for bipartite matching in the Pregel Model.
+ *
+ */
+public final class BipartiteMatching {
+
+ private static final Text UNMATCHED = new Text("U");
+ public static final String SEED_CONFIGURATION_KEY = "bipartite.matching.seed";
+
+ public static class BipartiteMatchingVertex extends
+ Vertex<Text, NullWritable, TextPair> {
+
+ // Components
+ private final static Text LEFT = new Text("L");
+ private final static Text RIGHT = new Text("R");
+
+ // Needed because Vertex value and message sent have same types.
+ private TextPair reusableMessage;
+ private Random random;
+
+ @Override
+ public void setup(Configuration conf) {
+ this.getPeer().getNumCurrentMessages();
+ reusableMessage = new TextPair(new Text(getVertexID()), new Text("1"))
+ .setNames("SourceVertex", "Vestige");
+ random = new Random(Long.parseLong(getConf().get(SEED_CONFIGURATION_KEY,
+ System.currentTimeMillis() + "")));
+ }
+
+ @Override
+ public void compute(Iterator<TextPair> messages) throws IOException {
+
+ if (isMatched()) {
+ voteToHalt();
+ return;
+ }
+
+ switch ((int) getSuperstepCount() % 4) {
+ case 0:
+ if (Objects.equal(getComponent(), LEFT)) {
+ sendMessageToNeighbors(getNewMessage());
+ }
+ break;
+
+ case 1:
+ if (Objects.equal(getComponent(), RIGHT)) {
+ List<TextPair> buffer = new ArrayList<TextPair>();
+ while (messages.hasNext()) {
+ buffer.add(messages.next());
+ }
+ if (buffer.size() > 0) {
+ TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
+ buffer.size()));
+
+ Text sourceVertex = getSourceVertex(luckyMsg);
+ sendMessage(sourceVertex, getNewMessage());
+ }
+ }
+ break;
+
+ case 2:
+ if (Objects.equal(getComponent(), LEFT)) {
+ List<TextPair> buffer = new ArrayList<TextPair>();
+
+ while (messages.hasNext()) {
+ buffer.add(messages.next());
+ }
+ if (buffer.size() > 0) {
+ TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
+ buffer.size()));
+
+ Text sourceVertex = getSourceVertex(luckyMsg);
+ setMatchVertex(sourceVertex);
+ sendMessage(sourceVertex, getNewMessage());
+ }
+ }
+ break;
+
+ case 3:
+ if (Objects.equal(getComponent(), RIGHT)) {
+ if (messages.hasNext()) {
+ Text sourceVertex = getSourceVertex(messages.next());
+ setMatchVertex(sourceVertex);
+ }
+ }
+ break;
+ }
+ }
+
+ /**
+ * Finds the vertex from which "msg" came.
+ */
+ private static Text getSourceVertex(TextPair msg) {
+ return msg.getFirst();
+ }
+
+ /**
+ * Pairs "this" vertex with the "matchVertex"
+ */
+ private void setMatchVertex(Text matchVertex) {
+ getValue().setFirst(matchVertex);
+ }
+
+ private TextPair getNewMessage() {
+ return reusableMessage;
+ }
+
+ /**
+ * Returns the component{LEFT/RIGHT} to which this vertex belongs.
+ */
+ private Text getComponent() {
+ return getValue().getSecond();
+ }
+
+ private boolean isMatched() {
+ return !getValue().getFirst().equals(UNMATCHED);
+ }
+
+ }
+
+ /**
+ *
+ * Input graph is given as<br/>
+ * <Vertex> <component value>: <adjacent_vertex_1> <adjacent_vertex_2> ..<br/>
+ * A L:B D<br/>
+ * B R:A C<br/>
+ * C L:B D<br/>
+ * D R:A C<br/>
+ */
+ public static class BipartiteMatchingVertexReader extends
+ VertexInputReader<LongWritable, Text, Text, NullWritable, TextPair> {
+
+ @Override
+ public boolean parseVertex(LongWritable key, Text value,
+ Vertex<Text, NullWritable, TextPair> vertex) {
+
+ String[] tokenArray = value.toString().split(":");
+ String[] adjArray = tokenArray[1].trim().split(" ");
+ String[] selfArray = tokenArray[0].trim().split(" ");
+
+ vertex.setVertexID(new Text(selfArray[0]));
+ vertex.setValue(new TextPair(UNMATCHED, new Text(selfArray[1])).setNames(
+ "MatchVertex", "Component"));
+ // initially a node is unmatched, which is denoted by U.
+
+ for (String adjNode : adjArray) {
+ vertex.addEdge(new Edge<Text, NullWritable>(new Text(adjNode), null));
+ }
+ return true;
+ }
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: <input> <output> "
+ + "[maximum iterations (default 30)] [tasks] [seed]");
+ System.exit(-1);
+ }
+
+ public static void main(String... args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+
+ if (args.length < 2) {
+ printUsage();
+ }
+
+ HamaConfiguration conf = new HamaConfiguration(new Configuration());
+ GraphJob job = new GraphJob(conf, BipartiteMatching.class);
+
+ // set the defaults
+ job.setMaxIteration(30);
+ job.setNumBspTask(2);
+ conf.set(SEED_CONFIGURATION_KEY, System.currentTimeMillis() + "");
+
+ if (args.length == 5)
+ conf.set(SEED_CONFIGURATION_KEY, args[4]);
+ if (args.length >= 4)
+ job.setNumBspTask(Integer.parseInt(args[3]));
+ if (args.length >= 3)
+ job.setMaxIteration(Integer.parseInt(args[2]));
+
+ job.setJobName("BipartiteMatching");
+ job.setInputPath(new Path(args[0]));
+ job.setOutputPath(new Path(args[1]));
+
+ job.setVertexClass(BipartiteMatchingVertex.class);
+ job.setVertexIDClass(Text.class);
+ job.setVertexValueClass(TextPair.class);
+ job.setEdgeValueClass(NullWritable.class);
+
+ job.setInputKeyClass(LongWritable.class);
+ job.setInputValueClass(Text.class);
+ job.setInputFormat(TextInputFormat.class);
+ job.setVertexInputReaderClass(BipartiteMatchingVertexReader.class);
+ job.setPartitioner(HashPartitioner.class);
+ job.setOutputFormat(TextOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(TextPair.class);
+
+ long startTime = System.currentTimeMillis();
+ if (job.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+
+}
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1350715&r1=1350714&r2=1350715&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Fri Jun 15 17:38:54 2012
@@ -34,6 +34,8 @@ public class ExampleDriver {
pgd.addClass("bench", RandBench.class, "Random Benchmark");
pgd.addClass("pagerank", PageRank.class, "PageRank");
pgd.addClass("inlnkcount", InlinkCount.class, "InlinkCount");
+ pgd.addClass("bipartite", BipartiteMatching.class,
+ "Bipartite Matching");
pgd.driver(args);
} catch (Throwable e) {
e.printStackTrace();
Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.util;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.base.Objects;
+
+/**
+ * TextPair class for use in BipartiteMatching algorithm.
+ *
+ */
+public final class TextPair implements Writable{
+
+ Text first;
+ Text second;
+
+ String nameFirst = "First";
+ String nameSecond = "Second";
+
+ public TextPair(){
+ first = new Text();
+ second = new Text();
+ }
+
+ public TextPair(Text first, Text second){
+ this.first = first;
+ this.second = second;
+ }
+
+ /**
+ * Sets the names of the attributes
+ */
+ public TextPair setNames(String nameFirst, String nameSecond){
+ this.nameFirst = nameFirst;
+ this.nameSecond = nameSecond;
+ return this;
+ }
+
+
+
+ public Text getFirst() {
+ return first;
+ }
+
+ public void setFirst(Text first) {
+ this.first = first;
+ }
+
+ public Text getSecond() {
+ return second;
+ }
+
+ public void setSecond(Text second) {
+ this.second = second;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ first.write(out);
+ second.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ first.readFields(in);
+ second.readFields(in);
+ }
+
+ @Override
+ public String toString(){
+ return Objects.toStringHelper(this)
+ .add(nameFirst, getFirst())
+ .add(nameSecond, getSecond())
+ .toString();
+ }
+
+
+}
Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1350715&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Fri Jun 15 17:38:54 2012
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class BipartiteMatchingTest extends TestCase{
+
+ private String[] input = {
+ "A L:B D",
+ "B R:A C",
+ "C L:B D",
+ "D R:A C"
+ };
+
+ private final static String DELIMETER = "\t";
+
+ @SuppressWarnings("serial")
+ private Map<String, String> output1 = new HashMap<String, String>()
+ {{
+ put("C", "TextPair{MatchVertex=D, Component=L}");
+ put("A", "TextPair{MatchVertex=B, Component=L}");
+ put("D", "TextPair{MatchVertex=C, Component=R}");
+ put("B", "TextPair{MatchVertex=A, Component=R}");
+ }};
+
+
+ private static String INPUT = "/tmp/graph.txt";
+ private static String OUTPUT = "/tmp/graph-bipartite";
+
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ private void generateTestData(){
+ FileWriter fout = null;
+ BufferedWriter bout = null;
+ PrintWriter pout = null;
+ try{
+ fout = new FileWriter(INPUT);
+ bout = new BufferedWriter(fout);
+ pout = new PrintWriter(bout);
+ for(String line:input){
+ pout.println(line);
+ }
+ }
+ catch(IOException e){
+ e.printStackTrace();
+ }
+ finally{
+ try {
+ if(pout!=null){pout.close();}
+ if(bout!=null){bout.close();}
+ if(fout!=null){fout.close();}
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
+ private void verifyResult()throws IOException{
+ FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
+ Text key = new Text();
+ Text value = new Text();
+ for(FileStatus file:files){
+ if(file.getLen() > 0){
+ FSDataInputStream in = fs.open(file.getPath());
+ BufferedReader bin = new BufferedReader(
+ new InputStreamReader(in));
+
+ String s = bin.readLine();
+ while(s!=null){
+ next(key, value, s);
+ String expValue = output1.get(key.toString());
+ assertEquals(expValue, value.toString());
+ System.out.println(key + " "+value);
+ s = bin.readLine();
+ }
+ in.close();
+ }
+ }
+ }
+
+ private static void next(Text key, Text value, String line){
+ String[] lineA = line.split(DELIMETER);
+ key.set(lineA[0]);
+ value.set(lineA[1]);
+ }
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(INPUT)))
+ fs.delete(new Path(INPUT), true);
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Test
+ public void testBipartiteMatching() throws IOException, InterruptedException,
+ ClassNotFoundException{
+ generateTestData();
+ try {
+ String seed = "2";
+ BipartiteMatching.main(new String[] { INPUT, OUTPUT, "30", "2",
+ seed});
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+
+}