You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/12 05:53:21 UTC
svn commit: r1659146 - in /hama/trunk: core/src/main/java/org/apache/hama/
core/src/main/java/org/apache/hama/bsp/
graph/src/main/java/org/apache/hama/graph/
Author: edwardyoon
Date: Thu Feb 12 04:53:20 2015
New Revision: 1659146
URL: http://svn.apache.org/r1659146
Log:
Minor code refactor
Modified:
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Feb 12 04:53:20 2015
@@ -105,6 +105,7 @@ public interface Constants {
// Job configuration related parameters.
// /////////////////////////////////////////////
public static final String JOB_INPUT_DIR = "bsp.input.dir";
+ public static final String JOB_OUTPUT_DIR = "bsp.output.dir";
public static final String JOB_PEERS_COUNT = "bsp.peers.num";
public static final String INPUT_FORMAT_CLASS = "bsp.input.format.class";
public static final String OUTPUT_FORMAT_CLASS = "bsp.output.format.class";
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Thu Feb 12 04:53:20 2015
@@ -331,7 +331,7 @@ public class BSPJobClient extends Config
short replication = (short) job.getInt("bsp.submit.replication", 10);
// only create the splits if we have an input
- if ((job.get("bsp.input.dir") != null)
+ if ((job.get(Constants.JOB_INPUT_DIR) != null)
|| (job.get("bsp.join.expr") != null)) {
// Create the splits for the job
LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 12 04:53:20 2015
@@ -257,7 +257,7 @@ public final class BSPPeerImpl<K1, V1, K
wrap.initCause(exp);
throw wrap;
}
-
+
if (inputSplit != null) {
DataInputBuffer splitBuffer = new DataInputBuffer();
splitBuffer.reset(split.getBytes(), 0, split.getLength());
@@ -314,12 +314,14 @@ public final class BSPPeerImpl<K1, V1, K
@SuppressWarnings("unchecked")
public final void initializeIO() throws Exception {
- initInput();
+ if (conf.get(Constants.JOB_INPUT_DIR) != null) {
+ initInput();
+ }
String outdir = null;
- if (conf.get("bsp.output.dir") != null) {
- Path outputDir = new Path(conf.get("bsp.output.dir",
- "tmp-" + System.currentTimeMillis()), Task.getOutputName(partition));
+ if (conf.get(Constants.JOB_OUTPUT_DIR) != null) {
+ Path outputDir = new Path(conf.get(Constants.JOB_OUTPUT_DIR, "tmp-"
+ + System.currentTimeMillis()), Task.getOutputName(partition));
outdir = outputDir.makeQualified(fs).toString();
}
outWriter = bspJob.getOutputFormat().getRecordWriter(fs, bspJob, outdir);
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Thu Feb 12 04:53:20 2015
@@ -17,14 +17,11 @@
*/
package org.apache.hama.graph;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
-import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.DataInputBuffer;
@@ -111,29 +108,6 @@ public final class GraphJobMessage imple
return vertexId;
}
- private ByteArrayInputStream bis = null;
- private DataInputStream dis = null;
- List<Writable> valuesCache = new ArrayList<Writable>();
-
- public List<Writable> getValues() {
- bis = new ByteArrayInputStream(byteBuffer.toByteArray());
- dis = new DataInputStream(bis);
-
- if (valuesCache.isEmpty()) {
- for (int i = 0; i < numOfValues; i++) {
- try {
- Writable v = GraphJobRunner.createVertexValue();
- v.readFields(dis);
- valuesCache.add(v);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- return valuesCache;
- }
-
public byte[] getValuesBytes() {
return byteBuffer.toByteArray();
}
@@ -163,6 +137,10 @@ public final class GraphJobMessage imple
add(v);
}
+ public int getNumOfValues() {
+ return this.numOfValues;
+ }
+
public GraphJobMessage(IntWritable size) {
this.flag = VERTICES_SIZE_FLAG;
this.integerMessage = size;
@@ -222,7 +200,6 @@ public final class GraphJobMessage imple
byte[] temp = new byte[bytesLength];
in.readFully(temp);
bufferDos.write(temp);
- bufferDos.flush();
} else if (isMapMessage()) {
map = new MapWritable();
map.readFields(in);
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Feb 12 04:53:20 2015
@@ -17,11 +17,12 @@
*/
package org.apache.hama.graph;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
@@ -230,19 +231,22 @@ public final class GraphJobRunner<V exte
notComputedVertices = new HashSet();
notComputedVertices.addAll(vertices.keySet());
- List<M> msgs = null;
+ Iterable<Writable> msgs = null;
Vertex<V, E, M> vertex = null;
while (currentMessage != null) {
vertex = vertices.get((V) currentMessage.getVertexId());
- msgs = (List<M>) currentMessage.getValues();
+ final int numOfValues = currentMessage.getNumOfValues();
+ final byte[] serializedMsgs = currentMessage.getValuesBytes();
+ msgs = getIterableMessages(numOfValues, serializedMsgs);
+
if (vertex.isHalted()) {
vertex.setActive();
}
if (!vertex.isHalted()) {
- vertex.compute(msgs);
+ vertex.compute((Iterable<M>) msgs);
notComputedVertices.remove(vertex.getVertexID());
activeVertices++;
}
@@ -644,6 +648,42 @@ public final class GraphJobRunner<V exte
return (X) ReflectionUtils.newInstance(EDGE_VALUE_CLASS);
}
+ public static Iterable<Writable> getIterableMessages(final int numOfValues,
+ final byte[] msgBytes) {
+
+ return new Iterable<Writable>() {
+ @Override
+ public Iterator<Writable> iterator() {
+ return new Iterator<Writable>() {
+ ByteArrayInputStream bis = new ByteArrayInputStream(msgBytes);
+ DataInputStream dis = new DataInputStream(bis);
+ int index = 0;
+
+ @Override
+ public boolean hasNext() {
+ return (index < numOfValues) ? true : false;
+ }
+
+ @Override
+ public Writable next() {
+ Writable v = GraphJobRunner.createVertexValue();
+ try {
+ v.readFields(dis);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ index++;
+ return v;
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ }
+ };
+ }
+
public int getChangedVertexCnt() {
return changedVertexCnt;
}
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OffHeapVerticesInfo.java Thu Feb 12 04:53:20 2015
@@ -17,6 +17,11 @@
*/
package org.apache.hama.graph;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListMap;
+
import org.apache.directmemory.DirectMemory;
import org.apache.directmemory.cache.CacheService;
import org.apache.directmemory.memory.Pointer;
@@ -29,11 +34,6 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.util.ReflectionUtils;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListMap;
-
/**
* An off heap version of a {@link org.apache.hama.graph.Vertex} storage.
*/
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java?rev=1659146&r1=1659145&r2=1659146&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessageManager.java Thu Feb 12 04:53:20 2015
@@ -39,7 +39,7 @@ public class OutgoingVertexMessageManage
.getLog(OutgoingVertexMessageManager.class);
private Combiner<Writable> combiner;
-
+ private Iterable<Writable> msgs;
private HashMap<InetSocketAddress, MessagePerVertex> storage = new HashMap<InetSocketAddress, MessagePerVertex>();
@SuppressWarnings("unchecked")
@@ -72,12 +72,15 @@ public class OutgoingVertexMessageManage
msgPerVertex.add(vertexID, msg);
// Combining messages
- if (combiner != null
- && msgPerVertex.get(vertexID).getValues().size() > 1) {
- storage.get(targetPeerAddress).put(
- vertexID,
- new GraphJobMessage(vertexID, combiner.combine(msgPerVertex.get(
- vertexID).getValues())));
+ if (combiner != null && msgPerVertex.get(vertexID).getNumOfValues() > 1) {
+
+ final int numOfValues = msgPerVertex.get(vertexID).getNumOfValues();
+ final byte[] msgBytes = msgPerVertex.get(vertexID).getValuesBytes();
+ msgs = GraphJobRunner.getIterableMessages(numOfValues, msgBytes);
+
+ // Overwrite
+ storage.get(targetPeerAddress).put(vertexID,
+ new GraphJobMessage(vertexID, combiner.combine(msgs)));
}
} else {
outgoingBundles.get(targetPeerAddress).addMessage(msg);