You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/20 02:39:45 UTC
svn commit: r1570039 - in /hama/trunk: ./ conf/
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/compress/
core/src/test/java/org/apache/hama/bsp/ core/src/test/java/...
Author: edwardyoon
Date: Thu Feb 20 01:39:45 2014
New Revision: 1570039
URL: http://svn.apache.org/r1570039
Log:
HAMA-870: Runtime message compression in Bundle (edwardyoon)
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/conf/hama-default.xml
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 20 01:39:45 2014
@@ -30,6 +30,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-870: Runtime message compression in Bundle (edwardyoon)
HAMA-869: Add Cloudera repository in maven pom file (Saisai Shao via edwardyoon)
HAMA-856: Optimize BSPMessageBundle (edwardyoon)
HAMA-853: Refactor Outgoing message manager (edwardyoon)
Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu Feb 20 01:39:45 2014
@@ -253,13 +253,14 @@
<!-- The message compression is not recommended for a large production environment -->
<property>
<name>hama.messenger.compression.class</name>
- <value></value>
+ <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
<description>The message compression algorithm to choose. Default is null.</description>
</property>
<property>
<name>hama.messenger.compression.threshold</name>
- <value></value>
- <description>The Compressor threshold sets the level at which compression begins.</description>
+ <value>128</value>
+ <description>The Compressor threshold sets the level at which compression begins.
+ The default is 128 bytes.</description>
</property>
<property>
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 20 01:39:45 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
/**
* BSPMessageBundle stores a group of messages so that they can be sent in batch
@@ -41,21 +42,42 @@ public class BSPMessageBundle<M extends
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
+ private BSPMessageCompressor<M> compressor = null;
+ private long threshold = 128;
+
private String className = null;
private int bundleSize = 0;
+ private int bundleLength = 0;
+
+ ByteArrayOutputStream byteBuffer = null;
+ DataOutputStream bufferDos = null;
- ByteArrayOutputStream bos = null;
- DataOutputStream dos = null;
ByteArrayInputStream bis = null;
DataInputStream dis = null;
public BSPMessageBundle() {
- bos = new ByteArrayOutputStream();
- dos = new DataOutputStream(bos);
+ byteBuffer = new ByteArrayOutputStream();
+ bufferDos = new DataOutputStream(byteBuffer);
bundleSize = 0;
+ bundleLength = 0;
+ }
+
+ ByteArrayOutputStream mbos = null;
+ DataOutputStream mdos = null;
+ ByteArrayInputStream mbis = null;
+ DataInputStream mdis = null;
+
+ public byte[] serialize(M message) throws IOException {
+ mbos = new ByteArrayOutputStream();
+ mdos = new DataOutputStream(mbos);
+ message.write(mdos);
+ return mbos.toByteArray();
}
+ private byte[] compressed;
+ private byte[] serialized;
+
/**
* Add message to this bundle.
*
@@ -63,7 +85,21 @@ public class BSPMessageBundle<M extends
*/
public void addMessage(M message) {
try {
- message.write(dos);
+ serialized = serialize(message);
+
+ if (compressor != null && serialized.length > threshold) {
+ bufferDos.writeBoolean(true);
+ compressed = compressor.compress(serialized);
+ bufferDos.writeInt(compressed.length);
+ bufferDos.write(compressed);
+
+ bundleLength += compressed.length;
+ } else {
+ bufferDos.writeBoolean(false);
+ bufferDos.write(serialized);
+
+ bundleLength += serialized.length;
+ }
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -76,11 +112,12 @@ public class BSPMessageBundle<M extends
}
public Iterator<M> iterator() {
- bis = new ByteArrayInputStream(bos.toByteArray());
+ bis = new ByteArrayInputStream(byteBuffer.toByteArray());
dis = new DataInputStream(bis);
-
+
Iterator<M> it = new Iterator<M>() {
M msg;
+ byte[] decompressed;
@Override
public boolean hasNext() {
@@ -98,6 +135,13 @@ public class BSPMessageBundle<M extends
@SuppressWarnings("unchecked")
@Override
public M next() {
+ boolean isCompressed = false;
+ try {
+ isCompressed = dis.readBoolean();
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ }
+
Class<M> clazz = null;
try {
clazz = (Class<M>) Class.forName(className);
@@ -107,7 +151,19 @@ public class BSPMessageBundle<M extends
msg = ReflectionUtils.newInstance(clazz, null);
try {
- msg.readFields(dis);
+ if (isCompressed) {
+ int length = dis.readInt();
+ compressed = new byte[length];
+ dis.readFully(compressed);
+ decompressed = compressor.decompress(compressed);
+
+ mbis = new ByteArrayInputStream(decompressed);
+ mdis = new DataInputStream(mbis);
+ msg.readFields(mdis);
+ } else {
+ msg.readFields(dis);
+ }
+
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -128,12 +184,17 @@ public class BSPMessageBundle<M extends
return bundleSize;
}
+ public void setCompressor(BSPMessageCompressor<M> compressor, long threshold) {
+ this.compressor = compressor;
+ this.threshold = threshold;
+ }
+
/**
- * @return the byte length of bundle object
+ * @return the byte length of messages
* @throws IOException
*/
public long getLength() throws IOException {
- return bos.toByteArray().length;
+ return bundleLength;
}
@Override
@@ -141,7 +202,7 @@ public class BSPMessageBundle<M extends
out.writeInt(bundleSize);
if (bundleSize > 0) {
out.writeUTF(className);
- byte[] messages = bos.toByteArray();
+ byte[] messages = byteBuffer.toByteArray();
out.writeInt(messages.length);
out.write(messages);
}
@@ -155,7 +216,7 @@ public class BSPMessageBundle<M extends
int bytesLength = in.readInt();
byte[] temp = new byte[bytesLength];
in.readFully(temp);
- dos.write(temp);
+ bufferDos.write(temp);
}
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Feb 20 01:39:45 2014
@@ -341,7 +341,7 @@ public class LocalBSPRunner implements J
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
- Configuration conf, InetSocketAddress peerAddress) {
+ HamaConfiguration conf, InetSocketAddress peerAddress) {
super.init(attemptId, peer, conf, peerAddress);
MANAGER_MAP.put(peerAddress, this);
selfAddress = peerAddress;
@@ -351,8 +351,13 @@ public class LocalBSPRunner implements J
@Override
public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
throws IOException {
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+ bundle.getLength());
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 512));
+
Iterator<M> it = bundle.iterator();
- while(it.hasNext()) {
+ while (it.hasNext()) {
MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
1L);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 20 01:39:45 2014
@@ -30,10 +30,13 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
@@ -72,6 +75,8 @@ public abstract class AbstractMessageMan
// List of listeners for all the sent messages
protected Queue<MessageEventListener<M>> messageListenerQueue;
+ protected BSPMessageCompressor<M> compressor;
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -80,7 +85,7 @@ public abstract class AbstractMessageMan
*/
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
- Configuration conf, InetSocketAddress peerAddress) {
+ HamaConfiguration conf, InetSocketAddress peerAddress) {
this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
this.attemptId = attemptId;
this.peer = peer;
@@ -88,8 +93,10 @@ public abstract class AbstractMessageMan
this.localQueue = getReceiverQueue();
this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
+
+ this.compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
this.outgoingMessageManager = getOutgoingMessageManager();
- this.outgoingMessageManager.init(conf);
+ this.outgoingMessageManager.init(conf, compressor);
}
/*
@@ -271,8 +278,10 @@ public abstract class AbstractMessageMan
}
@Override
- public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
- throws IOException {
+ public void loopBackMessages(BSPMessageBundle<M> bundle) throws IOException {
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+
Iterator<? extends Writable> it = bundle.iterator();
while (it.hasNext()) {
loopBackMessage(it.next());
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java Thu Feb 20 01:39:45 2014
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
/**
@@ -47,12 +46,4 @@ public interface HamaMessageManager<M ex
*/
public void put(BSPMessageBundle<M> messages) throws IOException;
- /**
- * This method puts a compressed message bundle for the next iteration.
- * Accessed concurrently from protocol, this must be sychronized internally.
- *
- * @param compMsgBundle
- */
- public void put(BSPCompressedBundle compMsgBundle) throws IOException;
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Thu Feb 20 01:39:45 2014
@@ -26,11 +26,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
import org.apache.hama.ipc.RPC;
import org.apache.hama.ipc.RPC.Server;
@@ -41,7 +41,7 @@ import org.apache.hama.util.LRUCache;
*
*/
public final class HamaMessageManagerImpl<M extends Writable> extends
- CompressableMessageManager<M> implements HamaMessageManager<M> {
+ AbstractMessageManager<M> implements HamaMessageManager<M> {
private static final Log LOG = LogFactory
.getLog(HamaMessageManagerImpl.class);
@@ -53,9 +53,8 @@ public final class HamaMessageManagerImp
@SuppressWarnings("serial")
@Override
public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
- Configuration conf, InetSocketAddress peerAddress) {
+ HamaConfiguration conf, InetSocketAddress peerAddress) {
super.init(attemptId, peer, conf, peerAddress);
- super.initCompression(conf);
startRPCServer(conf, peerAddress);
peersLRUCache = new LRUCache<InetSocketAddress, HamaMessageManager<M>>(
maxCachedConnections) {
@@ -119,16 +118,8 @@ public final class HamaMessageManagerImp
throw new IllegalArgumentException("Can not find " + addr.toString()
+ " to transfer messages to!");
} else {
- if (compressor != null
- && (bundle.getLength() > conf.getLong(
- "hama.messenger.compression.threshold", 1048576))) {
-
- BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
- bspPeerConnection.put(compMsgBundle);
- peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);
- } else {
- bspPeerConnection.put(bundle);
- }
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, bundle.getLength());
+ bspPeerConnection.put(bundle);
}
}
@@ -165,12 +156,6 @@ public final class HamaMessageManagerImp
}
@Override
- public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
- BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
- loopBackMessages(bundle);
- }
-
- @Override
public final long getProtocolVersion(String arg0, long arg1)
throws IOException {
return versionID;
@@ -183,4 +168,5 @@ public final class HamaMessageManagerImp
}
return null;
}
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Thu Feb 20 01:39:45 2014
@@ -22,8 +22,8 @@ import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
@@ -50,7 +50,7 @@ public interface MessageManager<M extend
*
*/
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
- Configuration conf, InetSocketAddress peerAddress);
+ HamaConfiguration conf, InetSocketAddress peerAddress);
/**
* Close is called after a task ran. Should be used to cleanup things e.G.
@@ -99,7 +99,7 @@ public interface MessageManager<M extend
/**
* Send the messages to self to receive in the next superstep.
*/
- public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+ public void loopBackMessages(BSPMessageBundle<M> bundle)
throws IOException;
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java Thu Feb 20 01:39:45 2014
@@ -21,19 +21,19 @@ import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
public interface OutgoingMessageManager<M extends Writable> {
- public void init(Configuration conf);
-
+ public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor);
+
public void addMessage(String peerName, M msg);
public void clear();
public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator();
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Thu Feb 20 01:39:45 2014
@@ -22,30 +22,34 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.util.BSPNetUtils;
import org.apache.hama.util.ReflectionUtils;
public class OutgoingPOJOMessageBundle<M extends Writable> implements
OutgoingMessageManager<M> {
+ private HamaConfiguration conf;
+ private BSPMessageCompressor<M> compressor;
private Combiner<M> combiner;
private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
@SuppressWarnings("unchecked")
@Override
- public void init(Configuration conf) {
+ public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor) {
+ this.conf = conf;
+ this.compressor = compressor;
final String combinerName = conf.get(Constants.COMBINER_CLASS);
if (combinerName != null) {
try {
- Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+ this.combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
.getClassByName(combinerName));
- this.combiner = combiner;
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
@@ -61,6 +65,8 @@ public class OutgoingPOJOMessageBundle<M
BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
bundle.addMessage(msg);
BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+ combined.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
combined.addMessage(combiner.combine(bundle));
outgoingBundles.put(targetPeerAddress, combined);
} else {
@@ -79,7 +85,10 @@ public class OutgoingPOJOMessageBundle<M
}
if (!outgoingBundles.containsKey(targetPeerAddress)) {
- outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
+ BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ bundle.setCompressor(compressor,
+ conf.getLong("hama.messenger.compression.threshold", 128));
+ outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java Thu Feb 20 01:39:45 2014
@@ -20,32 +20,16 @@ package org.apache.hama.bsp.message.comp
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
/**
- * Provides utilities for compressing and decompressing BSPMessageBundle.
+ * Provides utilities for compressing and decompressing byte array.
*
*/
public abstract class BSPMessageCompressor<M extends Writable> {
public static final Log LOG = LogFactory.getLog(BSPMessageCompressor.class);
- /**
- * Compresses the BSPMessageBundle and returns it in form of a
- * BSPCompressedBundle.
- *
- * @param bundle
- * @return
- */
- public abstract BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
+ public abstract byte[] compress(byte[] bytes);
- /**
- * Decompresses a BSPCompressedBundle and returns the corresponding
- * BSPMessageBundle.
- *
- * @param compMsgBundle
- * @return
- */
- public abstract BSPMessageBundle<M> decompressBundle(
- BSPCompressedBundle compMsgBundle);
+ public abstract byte[] decompress(byte[] compressedBytes);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java Thu Feb 20 01:39:45 2014
@@ -23,11 +23,11 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hama.bsp.BSPMessageBundle;
public class Bzip2Compressor<M extends Writable> extends
BSPMessageCompressor<M> {
@@ -35,23 +35,19 @@ public class Bzip2Compressor<M extends W
private final BZip2Codec codec = new BZip2Codec();
@Override
- public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
- BSPCompressedBundle compMsgBundle = null;
+ public byte[] compress(byte[] bytes) {
ByteArrayOutputStream bos = null;
CompressionOutputStream sos = null;
DataOutputStream dos = null;
+ byte[] compressedBytes = null;
try {
bos = new ByteArrayOutputStream();
sos = codec.createOutputStream(bos);
dos = new DataOutputStream(sos);
-
- bundle.write(dos);
dos.close(); // Flush the stream as no more data will be sent.
- byte[] data = bos.toByteArray();
- compMsgBundle = new BSPCompressedBundle(data);
-
+ compressedBytes = bos.toByteArray();
} catch (IOException ioe) {
LOG.error("Unable to compress", ioe);
} finally {
@@ -62,7 +58,7 @@ public class Bzip2Compressor<M extends W
LOG.warn("Failed to close compression streams.", e);
}
}
- return compMsgBundle;
+ return compressedBytes;
}
/**
@@ -73,20 +69,17 @@ public class Bzip2Compressor<M extends W
* @return
*/
@Override
- public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+ public byte[] decompress(byte[] compressedBytes) {
ByteArrayInputStream bis = null;
CompressionInputStream sis = null;
DataInputStream dis = null;
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ byte[] bytes = null;
try {
- byte[] data = compMsgBundle.getData();
- bis = new ByteArrayInputStream(data);
+ bis = new ByteArrayInputStream(compressedBytes);
sis = codec.createInputStream(bis);
dis = new DataInputStream(sis);
-
- bundle.readFields(dis);
-
+ bytes = IOUtils.toByteArray(dis);
} catch (IOException ioe) {
LOG.error("Unable to decompress.", ioe);
} finally {
@@ -99,7 +92,7 @@ public class Bzip2Compressor<M extends W
}
}
- return bundle;
+ return bytes;
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java Thu Feb 20 01:39:45 2014
@@ -23,8 +23,8 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
import org.xerial.snappy.SnappyInputStream;
import org.xerial.snappy.SnappyOutputStream;
@@ -32,22 +32,21 @@ public class SnappyCompressor<M extends
BSPMessageCompressor<M> {
@Override
- public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
- BSPCompressedBundle compMsgBundle = null;
+ public byte[] compress(byte[] bytes) {
ByteArrayOutputStream bos = null;
SnappyOutputStream sos = null;
DataOutputStream dos = null;
+ byte[] compressedBytes = null;
try {
bos = new ByteArrayOutputStream();
sos = new SnappyOutputStream(bos);
dos = new DataOutputStream(sos);
- bundle.write(dos);
+ dos.write(bytes);
dos.close(); // Flush the stream as no more data will be sent.
- byte[] data = bos.toByteArray();
- compMsgBundle = new BSPCompressedBundle(data);
+ compressedBytes = bos.toByteArray();
} catch (IOException ioe) {
LOG.error("Unable to compress", ioe);
@@ -59,7 +58,7 @@ public class SnappyCompressor<M extends
LOG.warn("Failed to close compression streams.", e);
}
}
- return compMsgBundle;
+ return compressedBytes;
}
/**
@@ -70,20 +69,18 @@ public class SnappyCompressor<M extends
* @return
*/
@Override
- public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+ public byte[] decompress(byte[] compressedBytes) {
ByteArrayInputStream bis = null;
SnappyInputStream sis = null;
DataInputStream dis = null;
- BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+ byte[] bytes = null;
try {
- byte[] data = compMsgBundle.getData();
- bis = new ByteArrayInputStream(data);
+ bis = new ByteArrayInputStream(compressedBytes);
sis = new SnappyInputStream(bis);
dis = new DataInputStream(sis);
- bundle.readFields(dis);
-
+ bytes = IOUtils.toByteArray(dis);
} catch (IOException ioe) {
LOG.error("Unable to decompress.", ioe);
} finally {
@@ -96,7 +93,7 @@ public class SnappyCompressor<M extends
}
}
- return bundle;
+ return bytes;
}
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Feb 20 01:39:45 2014
@@ -76,7 +76,7 @@ public class TestCheckpoint extends Test
@Override
public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
- Configuration conf, InetSocketAddress peerAddress) {
+ HamaConfiguration conf, InetSocketAddress peerAddress) {
// TODO Auto-generated method stub
}
@@ -130,9 +130,8 @@ public class TestCheckpoint extends Test
listener.onMessageReceived(message);
}
- @SuppressWarnings("unchecked")
@Override
- public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+ public void loopBackMessages(BSPMessageBundle<Text> bundle) {
this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
}
@@ -146,11 +145,11 @@ public class TestCheckpoint extends Test
this.listener = listener;
}
- @Override
- public InetSocketAddress getListenerAddress() {
- // TODO Auto-generated method stub
- return null;
- }
+ @Override
+ public InetSocketAddress getListenerAddress() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
@@ -444,8 +443,8 @@ public class TestCheckpoint extends Test
}
private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
- @SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step,
- long count) {
+ @SuppressWarnings("rawtypes")
+ BSPPeer bspTask, BSPJob job, long step, long count) {
ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
@@ -648,7 +647,7 @@ public class TestCheckpoint extends Test
BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
assertEquals(5, bundleRead.size());
-
+
String recoveredMsg = bundleRead.iterator().next().toString();
assertEquals(recoveredMsg, "data");
dfs.delete(new Path("checkpoint"), true);
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Thu Feb 20 01:39:45 2014
@@ -17,17 +17,21 @@
*/
package org.apache.hama.bsp.message.compress;
-import java.util.Iterator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hadoop.io.IntWritable;
import org.apache.hama.bsp.message.type.IntegerMessage;
public class TestBSPMessageCompressor extends TestCase {
- public void testCompression() {
+ public void testCompression() throws IOException {
Configuration configuration = new Configuration();
BSPMessageCompressor<IntegerMessage> compressor = new BSPMessageCompressorFactory<IntegerMessage>()
.getCompressor(configuration);
@@ -40,24 +44,27 @@ public class TestBSPMessageCompressor ex
assertNotNull(compressor);
- int n = 20;
- BSPMessageBundle<IntegerMessage> bundle = new BSPMessageBundle<IntegerMessage>();
- IntegerMessage[] dmsg = new IntegerMessage[n];
-
- for (int i = 1; i <= n; i++) {
- dmsg[i - 1] = new IntegerMessage("" + i, i);
- bundle.addMessage(dmsg[i - 1]);
- }
-
- BSPCompressedBundle compBundle = compressor.compressBundle(bundle);
- BSPMessageBundle<IntegerMessage> uncompBundle = compressor
- .decompressBundle(compBundle);
-
- int i = 1;
- Iterator<IntegerMessage> it = uncompBundle.iterator();
- while(it.hasNext()) {
- assertEquals((int) it.next().getData(), i);
- i++;
- }
+ IntWritable a = new IntWritable(123);
+ IntWritable b = new IntWritable(321);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(bos);
+ a.write(dos);
+ b.write(dos);
+
+ byte[] x = bos.toByteArray();
+
+ byte[] compressed = compressor.compress(x);
+ byte[] decompressed = compressor.decompress(compressed);
+
+ ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+ DataInputStream dis = new DataInputStream(bis);
+
+ IntWritable c = new IntWritable();
+ c.readFields(dis);
+ assertEquals(123, c.get());
+
+ IntWritable d = new IntWritable();
+ d.readFields(dis);
+ assertEquals(321, d.get());
}
}
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Thu Feb 20 01:39:45 2014
@@ -38,6 +38,7 @@ import org.apache.hama.bsp.ClusterStatus
import org.apache.hama.bsp.FileOutputFormat;
import org.apache.hama.bsp.NullInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.bsp.sync.SyncException;
public class PiEstimator {
@@ -116,6 +117,9 @@ public class PiEstimator {
HamaConfiguration conf = new HamaConfiguration();
BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+ bsp.setCompressionCodec(SnappyCompressor.class);
+ bsp.setCompressionThreshold(40);
+
// Set the job name
bsp.setJobName("Pi Estimation Example");
bsp.setBspClass(MyEstimator.class);
Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java Thu Feb 20 01:39:45 2014
@@ -18,6 +18,8 @@
package org.apache.hama.examples;
+import java.io.IOException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
@@ -28,14 +30,13 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.ml.semiclustering.SemiClusterMessage;
import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
import org.apache.hama.ml.semiclustering.SemiClusterVertexOutputWriter;
import org.apache.hama.ml.semiclustering.SemiClusteringVertex;
-import java.io.IOException;
-
public class SemiClusterJobDriver {
protected static final Log LOG = LogFactory
@@ -50,6 +51,10 @@ public class SemiClusterJobDriver {
public static void startTask(HamaConfiguration conf) throws IOException,
InterruptedException, ClassNotFoundException {
GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+ // 80,887,377
+ semiClusterJob.setCompressionCodec(SnappyCompressor.class);
+ semiClusterJob.setCompressionThreshold(10);
+
semiClusterJob
.setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
semiClusterJob.setJobName("SemiClusterJob");
Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java Thu Feb 20 01:39:45 2014
@@ -45,6 +45,7 @@ import org.apache.hama.HamaConfiguration
import org.apache.hama.bsp.HashPartitioner;
import org.apache.hama.bsp.TextInputFormat;
import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.graph.GraphJob;
import org.apache.hama.ml.semiclustering.SemiClusterMessage;
import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -56,7 +57,6 @@ import org.junit.Test;
public class SemiClusterMatchingTest extends TestCase {
private static String INPUT = "/tmp/graph.txt";
private static String OUTPUT = "/tmp/graph-semiCluster";
- private static final String requestedGraphJobMaxIterationString = "hama.graph.max.iteration";
private static final String semiClusterMaximumVertexCount = "semicluster.max.vertex.count";
private static final String graphJobMessageSentCount = "semicluster.max.message.sent.count";
private static final String graphJobVertexMaxClusterCount = "vertex.max.cluster.count";
@@ -225,11 +225,15 @@ public class SemiClusterMatchingTest ext
try {
HamaConfiguration conf = new HamaConfiguration();
- conf.setInt(requestedGraphJobMaxIterationString, 15);
conf.setInt(semiClusterMaximumVertexCount, 100);
conf.setInt(graphJobMessageSentCount, 100);
conf.setInt(graphJobVertexMaxClusterCount, 1);
GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+ semiClusterJob.setMaxIteration(15);
+
+ semiClusterJob.setCompressionCodec(SnappyCompressor.class);
+ semiClusterJob.setCompressionThreshold(10);
+
semiClusterJob
.setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
semiClusterJob.setJobName("SemiClusterJob");
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java Thu Feb 20 01:39:45 2014
@@ -27,13 +27,14 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.util.BSPNetUtils;
public class OutgoingVertexMessagesManager<M extends Writable> implements
@@ -41,6 +42,8 @@ public class OutgoingVertexMessagesManag
protected static final Log LOG = LogFactory
.getLog(OutgoingVertexMessagesManager.class);
+ private HamaConfiguration conf;
+ private BSPMessageCompressor<GraphJobMessage> compressor;
private Combiner<Writable> combiner;
private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>();
@@ -51,7 +54,10 @@ public class OutgoingVertexMessagesManag
@SuppressWarnings("unchecked")
@Override
- public void init(Configuration conf) {
+ public void init(HamaConfiguration conf,
+ BSPMessageCompressor<GraphJobMessage> compressor) {
+ this.conf = conf;
+ this.compressor = compressor;
if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
Combiner.class)) {
LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
@@ -112,8 +118,9 @@ public class OutgoingVertexMessagesManag
}
if (!outgoingBundles.containsKey(targetPeerAddress)) {
- outgoingBundles.put(targetPeerAddress,
- new BSPMessageBundle<GraphJobMessage>());
+ BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>();
+ bundle.setCompressor(compressor, conf.getLong("hama.messenger.compression.threshold", 128));
+ outgoingBundles.put(targetPeerAddress, bundle);
}
return targetPeerAddress;
}
@@ -141,4 +148,5 @@ public class OutgoingVertexMessagesManag
vertexMessageMap.clear();
return outgoingBundles.entrySet().iterator();
}
+
}
Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Feb 20 01:39:45 2014
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.HashPartition
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.TestBSPMasterGroomServer;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
import org.apache.hama.commons.io.TextArrayWritable;
import org.apache.hama.graph.example.PageRank;
import org.apache.hama.graph.example.PageRank.PagerankSeqReader;
@@ -85,6 +86,7 @@ public class TestSubmitGraphJob extends
// set the defaults
bsp.setMaxIteration(30);
+ bsp.setCompressionCodec(SnappyCompressor.class);
bsp.setAggregatorClass(AverageAggregator.class);
bsp.setInputFormat(SequenceFileInputFormat.class);