You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/03/16 00:48:59 UTC
git commit: updated refs/heads/trunk to 4170eeb
Repository: giraph
Updated Branches:
refs/heads/trunk fafecee71 -> 4170eeb05
unsafe byte readers/writers
Summary: using unsafe readers/writers
Test Plan:
tested on PageRank app, and Fanout computation. In both cases, there is a ~20% speedup
JIRA: https://issues.apache.org/jira/browse/GIRAPH-1049
Reviewers: sergey.edunov, maja.kabiljo, dionysis.logothetis, ikabiljo
Reviewed By: ikabiljo
Differential Revision: https://reviews.facebook.net/D55509
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/4170eeb0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/4170eeb0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/4170eeb0
Branch: refs/heads/trunk
Commit: 4170eeb054ba1414eceb08ec6c0fbdbdb17eb5c2
Parents: fafecee
Author: spupyrev <sp...@fb.com>
Authored: Tue Mar 15 16:47:15 2016 -0700
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Tue Mar 15 16:48:42 2016 -0700
----------------------------------------------------------------------
.../api/local/InternalMessageStore.java | 46 ++++++++++++--------
1 file changed, 29 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/4170eeb0/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
index 6c0cccb..92d9821 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java
@@ -34,8 +34,8 @@ import org.apache.giraph.conf.MessageClasses;
import org.apache.giraph.factories.MessageValueFactory;
import org.apache.giraph.types.ops.TypeOps;
import org.apache.giraph.types.ops.TypeOpsUtils;
-import org.apache.giraph.utils.ExtendedByteArrayDataInput;
-import org.apache.giraph.utils.ExtendedByteArrayDataOutput;
+import org.apache.giraph.utils.ExtendedDataInput;
+import org.apache.giraph.utils.ExtendedDataOutput;
import org.apache.giraph.utils.UnsafeReusableByteArrayInput;
import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.Writable;
@@ -69,7 +69,7 @@ interface InternalMessageStore
abstract class InternalConcurrentMessageStore
<I extends WritableComparable, M extends Writable, R>
implements InternalMessageStore<I, M> {
- protected final ConcurrentHashMap<I, R> received =
+ private final ConcurrentHashMap<I, R> received =
new ConcurrentHashMap<>();
private final Class<I> idClass;
@@ -102,6 +102,10 @@ interface InternalMessageStore
return value;
}
+ R removeFor(I id) {
+ return received.remove(id);
+ }
+
abstract R createNewReceiver();
@Override
@@ -118,8 +122,9 @@ interface InternalMessageStore
public static <I extends WritableComparable, M extends Writable>
InternalMessageStore<I, M> createMessageStore(
- final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
- final MessageClasses<I, M> messageClasses) {
+ final ImmutableClassesGiraphConfiguration<I, ?, ?> conf,
+ final MessageClasses<I, M> messageClasses
+ ) {
MessageCombiner<? super I, M> combiner =
messageClasses.createMessageCombiner(conf);
if (combiner != null) {
@@ -132,8 +137,9 @@ interface InternalMessageStore
messageClasses.createMessageValueFactory(conf));
} else {
return new InternalByteMessageStore<>(
- conf.getVertexIdClass(),
- messageClasses.createMessageValueFactory(conf));
+ conf.getVertexIdClass(),
+ messageClasses.createMessageValueFactory(conf),
+ conf);
}
}
@@ -175,7 +181,7 @@ interface InternalMessageStore
@Override
public Iterable<M> takeMessages(I id) {
- M message = received.remove(id);
+ M message = removeFor(id);
if (message != null) {
return Collections.singleton(message);
} else {
@@ -206,18 +212,22 @@ interface InternalMessageStore
static class InternalByteMessageStore
<I extends WritableComparable, M extends Writable>
extends InternalConcurrentMessageStore<I, M,
- ExtendedByteArrayDataOutput> {
+ ExtendedDataOutput> {
private final MessageValueFactory<M> messageFactory;
+ private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
public InternalByteMessageStore(
- Class<I> idClass, MessageValueFactory<M> messageFactory) {
+ Class<I> idClass, MessageValueFactory<M> messageFactory,
+ ImmutableClassesGiraphConfiguration<I, ?, ?> conf
+ ) {
super(idClass);
this.messageFactory = messageFactory;
+ this.conf = conf;
}
@Override
public Iterable<M> takeMessages(I id) {
- final ExtendedByteArrayDataOutput out = received.remove(id);
+ final ExtendedDataOutput out = removeFor(id);
if (out == null) {
return null;
}
@@ -225,8 +235,10 @@ interface InternalMessageStore
return new Iterable<M>() {
@Override
public Iterator<M> iterator() {
- final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput(
- out.getByteArray(), 0, out.getPos());
+ final ExtendedDataInput in = conf.createExtendedDataInput(
+ out.getByteArray(), 0, out.getPos()
+ );
+
final M message = messageFactory.newInstance();
return new AbstractIterator<M>() {
@Override
@@ -248,7 +260,7 @@ interface InternalMessageStore
@Override
public void sendMessage(I id, M message) {
- ExtendedByteArrayDataOutput out = getReceiverFor(id);
+ ExtendedDataOutput out = getReceiverFor(id);
synchronized (out) {
try {
@@ -260,8 +272,8 @@ interface InternalMessageStore
}
@Override
- ExtendedByteArrayDataOutput createNewReceiver() {
- return new ExtendedByteArrayDataOutput();
+ ExtendedDataOutput createNewReceiver() {
+ return conf.createExtendedDataOutput();
}
}
@@ -285,7 +297,7 @@ interface InternalMessageStore
@Override
public Iterable<M> takeMessages(I id) {
- final List<byte[]> out = received.remove(id);
+ final List<byte[]> out = removeFor(id);
if (out == null) {
return null;
}