You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/10/09 13:38:46 UTC
svn commit: r1630373 - in /hama/trunk: CHANGES.txt
commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java
examples/src/main/java/org/apache/hama/examples/MaxFlow.java
examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java
Author: edwardyoon
Date: Thu Oct 9 11:38:45 2014
New Revision: 1630373
URL: http://svn.apache.org/r1630373
Log:
HAMA-907: Add MaxFlow example
Added:
hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java (with props)
hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java (with props)
hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java (with props)
Modified:
hama/trunk/CHANGES.txt
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1630373&r1=1630372&r2=1630373&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Oct 9 11:38:45 2014
@@ -4,6 +4,7 @@ Release 0.7.0 (unreleased changes)
NEW FEATURES
+ HAMA-907: Add MaxFlow example (Zhengjun via edwardyoon)
HAMA-915: Add Kryo serializer (edwardyoon)
HAMA-726: Hama on Mesos (Jeff Fenchel via edwardyoon)
HAMA-863: Implement SparseVector (Yexi Jiang)
Added: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java
URL: http://svn.apache.org/viewvc/hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java?rev=1630373&view=auto
==============================================================================
--- hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java (added)
+++ hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java Thu Oct 9 11:38:45 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.commons.io;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.FloatWritable;
+
+public class FloatArrayWritable extends ArrayWritable {
+ public FloatArrayWritable() {
+ super(FloatWritable.class);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.get().length); // write values
+ for (int i = 0; i < this.get().length; i++) {
+ this.get()[i].write(out);
+ }
+ }
+}
Propchange: hama/trunk/commons/src/main/java/org/apache/hama/commons/io/FloatArrayWritable.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java?rev=1630373&view=auto
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java (added)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java Thu Oct 9 11:38:45 2014
@@ -0,0 +1,662 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.commons.io.FloatArrayWritable;
+import org.apache.hama.graph.AbstractAggregator;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+import org.apache.hama.graph.VertexInputReader;
+
+/**
+ * Maxflow algorithm, find the max flow from source vertex to sink vertex in a
+ * graph.
+ */
+
+public class MaxFlow {
+ public static class MaxFlowVertex extends
+ Vertex<Text, FloatArrayWritable, FloatArrayWritable> {
+ static double DAMPING_FACTOR = 0.85;
+ static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
+ static String OUTPUT = null;
+ static FloatWritable SENSEMSG = new FloatWritable(-1);
+ static FloatWritable PUSHMSG = new FloatWritable(-2);
+ static FloatWritable OVERFLOWMSG = new FloatWritable(-7);
+ static FloatWritable FIRSTPUSHMSG = new FloatWritable(-8);
+ static FloatWritable INEDGE = new FloatWritable(-4);
+ static FloatWritable OUTEDGE = new FloatWritable(-5);
+ private int senseStepCount = 0;
+ private int pushStepCount = 0;
+ private FloatWritable overFlow = new FloatWritable(0.0f);
+ private FloatWritable preSuperStepOverFlow = new FloatWritable(0.0f);
+ FloatArrayWritable msg0 = new FloatArrayWritable();
+ FloatWritable[] mArray0 = new FloatWritable[5];
+ public ArrayList<FloatArrayWritable> senseMsgList = new ArrayList<FloatArrayWritable>();
+ public ArrayList<FloatArrayWritable> pushMsgList = new ArrayList<FloatArrayWritable>();
+ public LinkedList<Edge<Text, FloatArrayWritable>> inEdges = new LinkedList<Edge<Text, FloatArrayWritable>>();;
+ public LinkedList<Edge<Text, FloatArrayWritable>> outEdges = new LinkedList<Edge<Text, FloatArrayWritable>>();
+ public LinkedList<Edge<Text, FloatArrayWritable>> flowedOutEdges = new LinkedList<Edge<Text, FloatArrayWritable>>();
+ static public LinkedList<FloatArrayWritable> overFlowMsgList = new LinkedList<FloatArrayWritable>();
+
+ @Override
+ public void setup(HamaConfiguration conf) {
+ }
+
+ public void addInEdge(Edge<Text, FloatArrayWritable> edge) {
+ if (this.inEdges == null)
+ this.inEdges = new LinkedList<Edge<Text, FloatArrayWritable>>();
+ this.inEdges.add(edge);
+ }
+
+ public void addOutEdge(Edge<Text, FloatArrayWritable> edge) {
+ if (this.outEdges == null)
+ this.outEdges = new LinkedList<Edge<Text, FloatArrayWritable>>();
+ this.outEdges.add(edge);
+ }
+
+ @Override
+ public void compute(Iterable<FloatArrayWritable> messages)
+ throws IOException {
+ if (getSuperstepCount() == 0) {
+ aggregate();
+ init();
+ return;
+ }
+
+ if (!getVertexID().equals(getSinkID())) {
+ normalVertexCompute(messages);
+ }
+ ;
+
+ if (getVertexID().equals(getSinkID())) {
+ masterVertexCompute(messages);
+ }
+ }
+
+ private void init() throws IOException {
+ for (Edge<Text, FloatArrayWritable> e : getEdges()) {
+ if (e.getDestinationVertexID().equals(getVertexID()))
+ continue;
+ if (getEdgeCapacity(e).get() < 0) {
+ float cap = Math.abs(getEdgeCapacity(e).get());
+ setEdgeCapacity(e, new FloatWritable(cap));
+ setEdgeType(e.getValue(), INEDGE);
+ addInEdge(e);
+ } else {
+ setEdgeType(e.getValue(), OUTEDGE);
+ addOutEdge(e);
+ }
+ }
+
+ if (getVertexID().equals(getSinkID())) {
+ senseToNeighbors(new FloatWritable(Float.MAX_VALUE));
+ return;
+ }
+ if (getVertexID().equals(getSourceID())) {
+ overFlow.set(Float.MAX_VALUE);
+ return;
+ }
+ voteToHalt();
+ }
+
+ private void normalVertexCompute(Iterable<FloatArrayWritable> messages)
+ throws IOException {
+ aggregate();
+ ArrayList<FloatArrayWritable> pMsgList = new ArrayList<FloatArrayWritable>();
+ boolean senseToNeighbor = false;
+ for (FloatArrayWritable msg : messages) {
+ if (msg == null)
+ continue;
+ if (getMsgType(msg).equals(SENSEMSG)) {
+ if (isFirstSense(msg)) {
+ senseToNeighbor = true;
+ }
+ senseMsgList.add(msg);
+ } else if (getMsgType(msg).equals(PUSHMSG)) {
+ pMsgList.add(msg);
+ pushMsgList.add(msg);
+ } else if (getMsgType(msg).equals(FIRSTPUSHMSG)) {
+ pushFlowToNeighbors();
+ }
+ }
+ if (senseToNeighbor == true) {
+ senseToNeighbors(new FloatWritable(Float.MAX_VALUE));
+ if (overFlow.get() > 0) {
+ sendOverFlowNodeToMaster();
+ }
+ }
+ if (pMsgList.size() > 0) {
+ pushMsgComsume(pMsgList);
+ }
+ voteToHalt();
+ }
+
+ private void masterVertexCompute(Iterable<FloatArrayWritable> messages)
+ throws IOException {
+ receiveFlowFromNeighbors(messages);
+ boolean haveActivingNormalVertex = HaveActivingNormalVertex();
+ stepStatusDetecting(haveActivingNormalVertex);
+ boolean pushStepCompleted = pushStepCompleted(haveActivingNormalVertex);
+ boolean senseStepCompleted = senseStepCompleted(haveActivingNormalVertex);
+ if (senseStepCompleted) {
+ aggregate();
+ for (FloatArrayWritable msg : overFlowMsgList) {
+ msg0.set(mArray0);
+ setMsgType(msg0, FIRSTPUSHMSG);
+ setMsgPushStep(msg0, pushStepCount);
+ setMsgVertexID(msg0, textToFloat(getVertexID()));
+ setMsgFlow(msg0, 0.0f);
+ setEdgeType(msg0, new FloatWritable());
+ sendMessage(floatToText(getMsgVertexID(msg)), msg0);
+ }
+ overFlowMsgList.clear();
+ }
+ if (pushStepCompleted) {
+ aggregate();
+ if (algorithmCompleted()) {
+ log(1111, "maxFlow is: " + overFlow.get());
+ printMaxFlowValue(overFlow.get(), OUTPUT + "/maxflow");
+ voteToHalt();
+ } else {
+ preSuperStepOverFlow.set(overFlow.get());
+ senseToNeighbors(new FloatWritable(Float.MAX_VALUE));
+ }
+ }
+ ;
+ }
+
+ private void senseToNeighbors(FloatWritable needFlow) throws IOException {
+
+ msg0.set(mArray0);
+ setMsgSenseStep(msg0, senseStepCount);
+ mArray0[0] = SENSEMSG; // pos 0 store msg type.
+ setMsgType(msg0, SENSEMSG);
+ setMsgVertexID(msg0, textToFloat(getVertexID()));
+ for (Edge<Text, FloatArrayWritable> e : inEdges) {
+ setEdgeType(msg0, OUTEDGE);
+ float nflow = Math.min(getSpareFlow(e), needFlow.get());
+ boolean sendMsg = true;
+ if (nflow > 0) {
+ setNeedFlow(msg0, nflow);
+ for (FloatArrayWritable m : senseMsgList) {
+ if (getMsgVertexID(m).equals(e.getDestinationVertexID())) {
+ sendMsg = false;
+ break;
+ }
+ }
+ if (sendMsg == true)
+ sendMessage(e.getDestinationVertexID(), msg0);
+ }
+ }
+ for (Edge<Text, FloatArrayWritable> e : outEdges) {
+ setEdgeType(msg0, INEDGE);
+ setNeedFlow(msg0, Math.min(getSpareFlow(e), needFlow.get()));
+ float nflow = Math.min(getSpareFlow(e), needFlow.get());
+ boolean sendMsg = true;
+ if (nflow > 0) {
+ setNeedFlow(msg0, nflow);
+ if (!e.getDestinationVertexID().equals(getSinkID())) {
+
+ for (FloatArrayWritable m : senseMsgList) {
+ if (getMsgVertexID(m).equals(e.getDestinationVertexID())) {
+ sendMsg = false;
+ break;
+ }
+ }
+ if (sendMsg == true)
+ sendMessage(e.getDestinationVertexID(), msg0);
+ }
+ }
+ }
+ }
+
+ private void sendOverFlowNodeToMaster() throws IOException {
+ msg0.set(mArray0);
+ setMsgType(msg0, OVERFLOWMSG);
+ setMsgVertexID(msg0, textToFloat(getVertexID()));
+ setMsgFlow(msg0, 0.0f);
+ setEdgeType(msg0, new FloatWritable());
+ sendMessage(getSinkID(), msg0);
+ }
+
+ private void pushMsgComsume(Iterable<FloatArrayWritable> msgList)
+ throws IOException {
+ receiveFlowFromNeighbors(msgList);
+ pushFlowToNeighbors();
+ }
+
+ private void receiveFlowFromNeighbors(Iterable<FloatArrayWritable> msgList) {
+ for (FloatArrayWritable msg : msgList) {
+ if (msg == null)
+ continue;
+ if (getMsgType(msg).equals(OVERFLOWMSG)) {
+ overFlowMsgList.add(msg);
+ continue;
+ }
+ Edge<Text, FloatArrayWritable> e = getEdgeAccordToMsg(msg);
+ FloatWritable oldFlow = getEdgeFlow(e);
+ FloatWritable flow = getMsgFlow(msg);
+
+ if (getEdgeType(e.getValue()).equals(INEDGE)) {
+ oldFlow.set(oldFlow.get() + flow.get());
+ } else if (getEdgeType(e.getValue()).equals(OUTEDGE)) {
+ oldFlow.set(oldFlow.get() - flow.get());
+ }
+ overFlow.set(overFlow.get() + flow.get());
+ }
+ }
+
+ private void pushFlowToNeighbors() throws IOException {
+ ArrayList<FloatArrayWritable> inSenseList = new ArrayList<FloatArrayWritable>();
+ ArrayList<FloatArrayWritable> outSenseList = new ArrayList<FloatArrayWritable>();
+ Iterator<FloatArrayWritable> iter = senseMsgList.iterator();
+ while (iter.hasNext()) {
+ FloatArrayWritable msg = (FloatArrayWritable) iter.next();
+ if (getMsgSenseStep(msg) < senseStepCount) {
+ iter.remove();
+ continue;
+ }
+ boolean rm = false;
+ for (FloatArrayWritable m : pushMsgList) {
+ if (getMsgVertexID(msg).equals(getMsgVertexID(m))) {
+ rm = true;
+ break;
+ }
+ }
+ if (rm == true) {
+ iter.remove();
+ }
+ }
+
+ for (FloatArrayWritable msg : senseMsgList) {
+ if (getEdgeType(msg).equals(INEDGE))
+ inSenseList.add(msg);
+ else if (getEdgeType(msg).equals(OUTEDGE))
+ outSenseList.add(msg);
+ }
+ msg0.set(mArray0);
+ setMsgType(msg0, PUSHMSG);
+ setMsgVertexID(msg0, textToFloat(getVertexID()));
+ for (FloatArrayWritable msg : outSenseList) {
+ Edge<Text, FloatArrayWritable> e = getEdgeAccordToMsg(msg);
+ if (overFlow.get() > 0) {
+ FloatWritable oldFlow = getEdgeFlow(e);
+ FloatWritable cap = getEdgeCapacity(e);
+ float flow = Math.min(overFlow.get(), cap.get() - oldFlow.get());
+ if (flow > 0) {
+ overFlow.set(overFlow.get() - flow);
+ oldFlow.set(oldFlow.get() + flow);
+ setEdgeType(msg0, INEDGE);
+ setMsgFlow(msg0, flow);
+ sendMessage(e, msg0);
+ }
+ } else {
+ break;
+ }
+ }
+ for (FloatArrayWritable msg : inSenseList) {
+ Edge<Text, FloatArrayWritable> e = getEdgeAccordToMsg(msg);
+ if (overFlow.get() > 0) {
+ FloatWritable oldFlow = (FloatWritable) e.getValue().get()[0];
+ float flow = Math.min(overFlow.get(), oldFlow.get());
+ if (flow > 0) {
+ overFlow.set(overFlow.get() - flow);
+ oldFlow.set(oldFlow.get() - flow);
+ setEdgeType(msg0, OUTEDGE);
+ setMsgFlow(msg0, flow);
+ sendMessage(e, msg0);
+ }
+ } else {
+ break;
+ }
+ }
+ }
+
+ private boolean algorithmCompleted() {
+ if (preSuperStepOverFlow.equals(overFlow) && overFlow.get() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ private void stepStatusDetecting(Boolean haveActivingNormalVertex) {
+ if (!haveActivingNormalVertex) {
+ if (senseStepCount == pushStepCount) {
+ senseStepCount++;
+ } else if (senseStepCount > pushStepCount) {
+ pushStepCount++;
+ }
+ }
+ }
+
+ private boolean pushStepCompleted(boolean haveActivingNormalVertex) {
+ if (haveActivingNormalVertex)
+ return false;
+ if (getSuperstepCount() <= 2)
+ return false;
+ return senseStepCount > pushStepCount;
+
+ }
+
+ private boolean senseStepCompleted(boolean haveActivingNormalVertex) {
+ if (haveActivingNormalVertex)
+ return false;
+ if (getSuperstepCount() <= 2)
+ return false;
+ return senseStepCount == pushStepCount;
+ }
+
+ private boolean HaveActivingNormalVertex() {
+ FloatArrayWritable value = getAggregatedValue(0);
+ if (value == null)
+ return true;
+ FloatWritable temp = (FloatWritable) value.get()[0];
+ if (temp.get() < getSuperstepCount() - 1) {
+ return false;
+ } else if (temp.get() == getSuperstepCount() - 1) {
+ return true;
+ }
+ return true;
+ }
+
+ private float getSpareFlow(Edge<Text, FloatArrayWritable> e) {
+ FloatWritable type = getEdgeType(e.getValue());
+ float cap = getEdgeCapacity(e).get();
+ float flow = getEdgeFlow(e).get();
+ float ret = 0.0f;
+ if (type.equals(INEDGE)) {
+ ret = cap - flow;
+ } else if (type.equals(OUTEDGE)) {
+ ret = flow;
+ }
+ return ret;
+ }
+
+ private void aggregate() throws IOException {
+ FloatArrayWritable array = new FloatArrayWritable();
+ FloatWritable[] farray = new FloatWritable[1];
+ array.set(farray);
+ farray[0] = new FloatWritable((float) getSuperstepCount());
+ aggregate(0, array);
+ }
+
+ private Edge<Text, FloatArrayWritable> getEdgeAccordToMsg(
+ FloatArrayWritable msg) {
+ Text vertexID = floatToText(getMsgVertexID(msg));
+ for (Edge<Text, FloatArrayWritable> e : inEdges) {
+ if (vertexID.equals(e.getDestinationVertexID()))
+ return e;
+ }
+ for (Edge<Text, FloatArrayWritable> e : outEdges) {
+ if (vertexID.equals(e.getDestinationVertexID()))
+ return e;
+ }
+ return null;
+ }
+
+ private boolean isFirstSense(FloatArrayWritable msg) {
+ if (getMsgSenseStep(msg) > senseStepCount) {
+ senseStepCount = getMsgSenseStep(msg);
+ pushMsgList.clear();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private void printMaxFlowValue(float value, String path) throws IOException {
+ File f = new File(path);
+ OutputStreamWriter in;
+ BufferedWriter bw = null;
+ try {
+ in = new OutputStreamWriter(new FileOutputStream(f));
+ bw = new BufferedWriter(in);
+ bw.write(String.valueOf(value));
+ bw.close();
+ } catch (FileNotFoundException e) {
+ // TODO Auto-generated catch block
+ // e.printStackTrace();
+ } finally {
+ if (bw != null)
+ bw.close();
+ }
+ }
+
+ private Text getSinkID() {
+ return new Text(String.valueOf(getNumVertices() - 1));
+ }
+
+ private Text getSourceID() {
+ return new Text(String.valueOf(0));
+ }
+
+ public static Text floatToText(FloatWritable value) {
+ return new Text(String.valueOf((int) (value.get())));
+ }
+
+ public static FloatWritable textToFloat(Text value) {
+ return new FloatWritable(Float.parseFloat(value.toString()));
+ }
+
+ public void setMsgType(FloatArrayWritable msg, FloatWritable value) {
+ msg.get()[0] = value;
+ }
+
+ public FloatWritable getMsgType(FloatArrayWritable msg) {
+ return (FloatWritable) msg.get()[0];
+ }
+
+ public FloatWritable getMsgVertexID(FloatArrayWritable msg) {
+ return (FloatWritable) msg.get()[1];
+ }
+
+ public void setMsgVertexID(FloatArrayWritable msg, FloatWritable value) {
+ msg.get()[1] = value;
+ }
+
+ public void setEdgeType(FloatArrayWritable msg, FloatWritable type) {
+ msg.get()[2] = type;
+ }
+
+ public FloatWritable getEdgeType(FloatArrayWritable msg) {
+ return (FloatWritable) msg.get()[2];
+ }
+
+ public int getMsgSenseStep(FloatArrayWritable msg) {
+ FloatWritable value = (FloatWritable) msg.get()[4];
+ return (int) value.get();
+ }
+
+ public void setMsgSenseStep(FloatArrayWritable msg, int step) {
+ FloatWritable v = new FloatWritable(step);
+ msg.get()[4] = v;
+ }
+
+ public int getMsgPushStep(FloatArrayWritable msg) {
+ return getMsgSenseStep(msg);
+ }
+
+ public void setMsgPushStep(FloatArrayWritable msg, int step) {
+ setMsgSenseStep(msg, step);
+ }
+
+ private void setNeedFlow(FloatArrayWritable msg, float value) {
+ FloatWritable v = new FloatWritable(value);
+ msg.get()[3] = v;
+ }
+
+ private FloatWritable getNeedFlow(FloatArrayWritable msg) {
+ return (FloatWritable) msg.get()[3];
+ }
+
+ private void setMsgFlow(FloatArrayWritable msg, float value) {
+ setNeedFlow(msg, value);
+ }
+
+ private FloatWritable getMsgFlow(FloatArrayWritable msg) {
+ return getNeedFlow(msg);
+ }
+
+ private FloatWritable getEdgeFlow(Edge<Text, FloatArrayWritable> e) {
+ return getMsgType(e.getValue());
+ }
+
+ private FloatWritable getEdgeCapacity(Edge<Text, FloatArrayWritable> e) {
+ return getMsgVertexID(e.getValue());
+ }
+
+ private void setEdgeCapacity(Edge<Text, FloatArrayWritable> e,
+ FloatWritable value) {
+ setMsgVertexID(e.getValue(), value);
+ }
+
+ public void log(int level, String s) {
+ if (level > 1000)
+ System.out.println(s);
+ }
+ }
+
+ public static class MaxFLowAgrregator extends
+ AbstractAggregator<FloatArrayWritable> {
+
+ int maxValue = 0;
+
+ @Override
+ public void aggregate(FloatArrayWritable value) {
+ FloatWritable v = (FloatWritable) value.get()[0];
+ if ((int) v.get() > maxValue)
+ maxValue = (int) v.get();
+ }
+
+ @Override
+ public FloatArrayWritable getValue() {
+ FloatArrayWritable value = new FloatArrayWritable();
+ FloatWritable[] array = new FloatWritable[1];
+ FloatWritable f = new FloatWritable();
+ f.set(maxValue);
+ array[0] = f;
+ value.set(array);
+ return value;
+ }
+ }
+
+ public static class MaxFlowSeqReader
+ extends
+ VertexInputReader<FloatWritable, FloatArrayWritable, Text, FloatArrayWritable, FloatArrayWritable> {
+ @Override
+ public boolean parseVertex(FloatWritable key, FloatArrayWritable value,
+ Vertex<Text, FloatArrayWritable, FloatArrayWritable> vertex)
+ throws Exception {
+ vertex.setVertexID(MaxFlowVertex.floatToText(key));
+ Writable[] values = value.get();
+ FloatWritable v2 = (FloatWritable) values[0];
+ FloatWritable capacity = (FloatWritable) values[1];
+ FloatArrayWritable cost = new FloatArrayWritable();
+ FloatWritable[] costArray = new FloatWritable[3];
+ costArray[0] = new FloatWritable(0.0f); // store flow
+ costArray[1] = capacity; // store capacity
+ if (capacity.get() < 0) {
+ costArray[2] = new FloatWritable(-4f);
+ } else {
+ costArray[2] = new FloatWritable(-5f);
+ }
+ cost.set(costArray);
+ Edge<Text, FloatArrayWritable> e = new Edge<Text, FloatArrayWritable>(
+ MaxFlowVertex.floatToText(v2), cost);
+ (vertex).addEdge(e);
+ return true;
+ }
+ }
+
+ public static GraphJob createJob(String[] args, HamaConfiguration conf)
+ throws IOException {
+ GraphJob maxFlow = new GraphJob(conf, PageRank.class);
+ maxFlow.setJobName("MaxFlow");
+ maxFlow.setMessageClass(FloatArrayWritable.class);
+ maxFlow.setEdgeValueClass(FloatArrayWritable.class);
+ maxFlow.setVertexClass(MaxFlowVertex.class);
+ maxFlow.setInputKeyClass(FloatWritable.class);
+ maxFlow.setInputValueClass(FloatArrayWritable.class);
+ maxFlow.setAggregatorClass(MaxFLowAgrregator.class);
+ maxFlow.setInputPath(new Path(args[0]));
+ maxFlow.setOutputPath(new Path(args[1]));
+ maxFlow.set("hama.graph.self.ref", "true");
+
+ if (args.length == 3) {
+ maxFlow.setNumBspTask(Integer.parseInt(args[2]));
+ }
+ maxFlow.setNumBspTask(3);
+ maxFlow.setMaxIteration(Integer.MAX_VALUE);
+ maxFlow.setVertexInputReaderClass(MaxFlowSeqReader.class);
+
+ maxFlow.setVertexIDClass(Text.class);
+ maxFlow.setVertexValueClass(FloatArrayWritable.class);
+ maxFlow.setEdgeValueClass(FloatArrayWritable.class);
+
+ maxFlow.setInputFormat(SequenceFileInputFormat.class);
+
+ maxFlow.setPartitioner(HashPartitioner.class);
+ maxFlow.setOutputFormat(TextOutputFormat.class);
+ maxFlow.setOutputKeyClass(DoubleWritable.class);
+ maxFlow.setOutputValueClass(FloatArrayWritable.class);
+ return maxFlow;
+ }
+
+ private static void printUsage() {
+ System.out.println("Usage: <input> <output> [tasks]");
+ System.exit(-1);
+ }
+
+ public static void main(String[] args) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ args[2] = "3";
+ if (args.length < 2) {
+ printUsage();
+ System.exit(-1);
+ }
+
+ HamaConfiguration conf = new HamaConfiguration();
+ GraphJob mfmcJob = createJob(args, conf);
+ long startTime = System.currentTimeMillis();
+ MaxFlowVertex.OUTPUT = args[1];
+ if (mfmcJob.waitForCompletion(true)) {
+ System.out.println("Job Finished in "
+ + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+ }
+ }
+}
Propchange: hama/trunk/examples/src/main/java/org/apache/hama/examples/MaxFlow.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java?rev=1630373&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java Thu Oct 9 11:38:45 2014
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.commons.io.FloatArrayWritable;
+import org.junit.Test;
+
+/**
+ * Testcase for {@link MaxFlow}
+ */
+public class MaxFlowTest extends TestCase {
+ private static String INPUT = "/tmp/maxflow-tmp.seq";
+ private static String TEXT_INPUT = "/tmp/maxflow.txt";
+ private static String TEXT_OUTPUT = INPUT + "maxflow.txt.seq";
+ private static String OUTPUT = "/tmp/maxflow-output";
+ private Configuration conf = new HamaConfiguration();
+ private FileSystem fs;
+
+ private static String[] input = new String[] {
+
+ "0\t1\t16", "1\t0\t-16", "1\t3\t12", "3\t1\t-12", "3\t5\t20", "5\t3\t-20",
+ "2\t1\t4", "1\t2\t-4", "3\t2\t9", "2\t3\t-9", "4\t3\t7", "3\t4\t-7",
+ "0\t2\t13", "2\t0\t-13", "2\t4\t14", "4\t2\t-14", "4\t5\t4", "5\t4\t-4"
+
+ };
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ fs = FileSystem.get(conf);
+ }
+
+ @Test
+ public void testMaxFlow() throws IOException, InterruptedException,
+ ClassNotFoundException, InstantiationException, IllegalAccessException {
+ generateTestData();
+ try {
+ MaxFlow.main(new String[] { INPUT, OUTPUT, "3" });
+ verifyResult();
+ } finally {
+ deleteTempDirs();
+ }
+ }
+
+ private void verifyResult() throws IOException {
+ File f = new File(OUTPUT + "/maxflow");
+ InputStreamReader in = new InputStreamReader(new FileInputStream(f));
+ @SuppressWarnings("resource")
+ BufferedReader br = new BufferedReader(in);
+ String result = br.readLine();
+ System.out.println("maxflow is: " + String.valueOf(23.0));
+ assertTrue(result.equals(String.valueOf(23.0)));
+ }
+
+ private void generateTestData() {
+
+ Configuration conf = new Configuration();
+ FileSystem fs;
+ SequenceFile.Writer writer = null;
+ try {
+ fs = FileSystem.get(conf);
+ Path path = new Path(INPUT);
+ writer = SequenceFile.createWriter(fs, conf, path, FloatWritable.class,
+ FloatArrayWritable.class);
+
+ for (String s : input) {
+ FloatArrayWritable value = new FloatArrayWritable();
+ FloatWritable[] valueArray = new FloatWritable[2];
+ value.set(valueArray);
+ String[] array = s.split("\t");
+ FloatWritable key = new FloatWritable(Float.valueOf(array[0]));
+ valueArray[0] = new FloatWritable(Float.valueOf(array[1])); // store v2.
+ valueArray[1] = new FloatWritable(Float.valueOf(array[2]));
+ System.out.println(" " + Float.valueOf(array[0]) + " "
+ + Float.valueOf(array[1]) + " " + Float.valueOf(array[2]));
+ writer.append(key, value);
+ }
+ writer.close();
+ } catch (IOException e1) {
+ // TODO Auto-generated catch block
+ e1.printStackTrace();
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void deleteTempDirs() {
+ try {
+ if (fs.exists(new Path(INPUT)))
+ fs.delete(new Path(INPUT), true);
+ if (fs.exists(new Path(OUTPUT)))
+ fs.delete(new Path(OUTPUT), true);
+ if (fs.exists(new Path(TEXT_INPUT)))
+ fs.delete(new Path(TEXT_INPUT), true);
+ if (fs.exists(new Path(TEXT_OUTPUT)))
+ fs.delete(new Path(TEXT_OUTPUT), true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: hama/trunk/examples/src/test/java/org/apache/hama/examples/MaxFlowTest.java
------------------------------------------------------------------------------
svn:eol-style = native