You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/08/31 17:23:39 UTC
[2/3] git commit: updated refs/heads/trunk to 2117d1d
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
index 715bf45..77b2689 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/IntFloatMessageStore.java
@@ -18,6 +18,8 @@
package org.apache.giraph.comm.messages.primitives;
+import com.google.common.collect.Lists;
+
import it.unimi.dsi.fastutil.ints.Int2FloatMap;
import it.unimi.dsi.fastutil.ints.Int2FloatOpenHashMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
@@ -30,17 +32,14 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.giraph.utils.VertexIdMessageIterator;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-
-import com.google.common.collect.Lists;
/**
* Special message store to be used when ids are IntWritable and messages
@@ -55,26 +54,26 @@ public class IntFloatMessageStore
/** Message messageCombiner */
private final
MessageCombiner<? super IntWritable, FloatWritable> messageCombiner;
- /** Service worker */
- private final CentralizedServiceWorker<IntWritable, ?, ?> service;
+ /** Partition split info */
+ private final PartitionSplitInfo<IntWritable> partitionInfo;
/**
* Constructor
*
- * @param service Service worker
+ * @param partitionInfo Partition split info
* @param messageCombiner Message messageCombiner
*/
public IntFloatMessageStore(
- CentralizedServiceWorker<IntWritable, Writable, Writable> service,
- MessageCombiner<? super IntWritable, FloatWritable> messageCombiner) {
- this.service = service;
+ PartitionSplitInfo<IntWritable> partitionInfo,
+ MessageCombiner<? super IntWritable, FloatWritable> messageCombiner
+ ) {
+ this.partitionInfo = partitionInfo;
this.messageCombiner = messageCombiner;
map = new Int2ObjectOpenHashMap<Int2FloatOpenHashMap>();
- for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+ for (int partitionId : partitionInfo.getPartitionIds()) {
Int2FloatOpenHashMap partitionMap = new Int2FloatOpenHashMap(
- (int) service.getPartitionStore()
- .getPartitionVertexCount(partitionId));
+ (int) partitionInfo.getPartitionVertexCount(partitionId));
map.put(partitionId, partitionMap);
}
}
@@ -91,7 +90,7 @@ public class IntFloatMessageStore
* @return Map which holds messages for partition which vertex belongs to.
*/
private Int2FloatOpenHashMap getPartitionMap(IntWritable vertexId) {
- return map.get(service.getPartitionId(vertexId));
+ return map.get(partitionInfo.getPartitionId(vertexId));
}
@Override
@@ -117,12 +116,27 @@ public class IntFloatMessageStore
reusableMessage);
message = reusableCurrentMessage.get();
}
+ // FIXME: messageCombiner should create an initial message instead
partitionMap.put(vertexId, message);
}
}
}
@Override
+ public void addMessage(
+ IntWritable vertexId,
+ FloatWritable message
+ ) throws IOException {
+ Int2FloatOpenHashMap partitionMap = getPartitionMap(vertexId);
+ synchronized (partitionMap) {
+ float originalValue = partitionMap.get(vertexId.get());
+ FloatWritable originalMessage = new FloatWritable(originalValue);
+ messageCombiner.combine(vertexId, originalMessage, message);
+ partitionMap.put(vertexId.get(), originalMessage.get());
+ }
+ }
+
+ @Override
public void finalizeStore() {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
index 4fc4843..19aede4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/LongDoubleMessageStore.java
@@ -30,15 +30,14 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.MessageCombiner;
import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
import org.apache.giraph.utils.EmptyIterable;
import org.apache.giraph.utils.VertexIdMessageIterator;
import org.apache.giraph.utils.VertexIdMessages;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
import com.google.common.collect.Lists;
@@ -56,26 +55,25 @@ public class LongDoubleMessageStore
private final
MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner;
/** Service worker */
- private final CentralizedServiceWorker<LongWritable, ?, ?> service;
+ private final PartitionSplitInfo<LongWritable> partitionInfo;
/**
* Constructor
*
- * @param service Service worker
+ * @param partitionInfo Partition split info
* @param messageCombiner Message messageCombiner
*/
public LongDoubleMessageStore(
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner) {
- this.service = service;
- this.messageCombiner =
- messageCombiner;
+ PartitionSplitInfo<LongWritable> partitionInfo,
+ MessageCombiner<? super LongWritable, DoubleWritable> messageCombiner
+ ) {
+ this.partitionInfo = partitionInfo;
+ this.messageCombiner = messageCombiner;
map = new Int2ObjectOpenHashMap<Long2DoubleOpenHashMap>();
- for (int partitionId : service.getPartitionStore().getPartitionIds()) {
+ for (int partitionId : partitionInfo.getPartitionIds()) {
Long2DoubleOpenHashMap partitionMap = new Long2DoubleOpenHashMap(
- (int) service.getPartitionStore()
- .getPartitionVertexCount(partitionId));
+ (int) partitionInfo.getPartitionVertexCount(partitionId));
map.put(partitionId, partitionMap);
}
}
@@ -92,7 +90,7 @@ public class LongDoubleMessageStore
* @return Map which holds messages for partition which vertex belongs to.
*/
private Long2DoubleOpenHashMap getPartitionMap(LongWritable vertexId) {
- return map.get(service.getPartitionId(vertexId));
+ return map.get(partitionInfo.getPartitionId(vertexId));
}
@Override
@@ -118,12 +116,27 @@ public class LongDoubleMessageStore
reusableMessage);
message = reusableCurrentMessage.get();
}
+ // FIXME: messageCombiner should create an initial message instead
partitionMap.put(vertexId, message);
}
}
}
@Override
+ public void addMessage(
+ LongWritable vertexId,
+ DoubleWritable message
+ ) throws IOException {
+ Long2DoubleOpenHashMap partitionMap = getPartitionMap(vertexId);
+ synchronized (partitionMap) {
+ double originalValue = partitionMap.get(vertexId.get());
+ DoubleWritable originalMessage = new DoubleWritable(originalValue);
+ messageCombiner.combine(vertexId, originalMessage, message);
+ partitionMap.put(vertexId.get(), originalMessage.get());
+ }
+ }
+
+ @Override
public void finalizeStore() {
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
deleted file mode 100644
index d1c33be..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListMessageStore.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.utils.VertexIdIterator;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <L> list type
- */
-public abstract class LongAbstractListMessageStore<M extends Writable,
- L extends List> extends LongAbstractMessageStore<M, L> {
- /**
- * Map used to store messages for nascent vertices i.e., ones
- * that did not exist at the start of current superstep but will get
- * created because of sending message to a non-existent vertex id
- */
- private final
- Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
-
- /**
- * Constructor
- *
- * @param messageValueFactory Factory for creating message values
- * @param service Service worker
- * @param config Hadoop configuration
- */
- public LongAbstractListMessageStore(
- MessageValueFactory<M> messageValueFactory,
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- ImmutableClassesGiraphConfiguration<LongWritable,
- Writable, Writable> config) {
- super(messageValueFactory, service, config);
- populateMap();
-
- // create map for vertex ids (i.e., nascent vertices) not known yet
- nascentMap = new Int2ObjectOpenHashMap<>();
- for (int partitionId : service.getPartitionStore().getPartitionIds()) {
- nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
- }
- }
-
- /**
- * Populate the map with all vertexIds for each partition
- */
- private void populateMap() { // TODO - can parallelize?
- // populate with vertex ids already known
- service.getPartitionStore().startIteration();
- while (true) {
- Partition partition = service.getPartitionStore().getNextPartition();
- if (partition == null) {
- break;
- }
- Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
- for (Object obj : partition) {
- Vertex vertex = (Vertex) obj;
- LongWritable vertexId = (LongWritable) vertex.getId();
- partitionMap.put(vertexId.get(), createList());
- }
- service.getPartitionStore().putPartition(partition);
- }
- }
-
- /**
- * Create an instance of L
- * @return instance of L
- */
- protected abstract L createList();
-
- /**
- * Get list for the current vertexId
- *
- * @param iterator vertexId iterator
- * @return list for current vertexId
- */
- protected L getList(
- VertexIdIterator<LongWritable> iterator) {
- PartitionOwner owner =
- service.getVertexPartitionOwner(iterator.getCurrentVertexId());
- long vertexId = iterator.getCurrentVertexId().get();
- int partitionId = owner.getPartitionId();
- Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
- if (!partitionMap.containsKey(vertexId)) {
- synchronized (nascentMap) {
- // assumption: not many nascent vertices are created
- // so overall synchronization is negligible
- Long2ObjectOpenHashMap<L> nascentPartitionMap =
- nascentMap.get(partitionId);
- if (nascentPartitionMap.get(vertexId) == null) {
- nascentPartitionMap.put(vertexId, createList());
- }
- return nascentPartitionMap.get(vertexId);
- }
- }
- return partitionMap.get(vertexId);
- }
-
- @Override
- public void finalizeStore() {
- for (int partitionId : nascentMap.keySet()) {
- // nascent vertices are present only in nascent map
- map.get(partitionId).putAll(nascentMap.get(partitionId));
- }
- nascentMap.clear();
- }
-
- // TODO - discussion
- /*
- some approaches for ensuring correctness with parallel inserts
- - current approach: uses a small extra bit of memory by pre-populating
- map & pushes everything map cannot handle to nascentMap
- at the beginning of next superstep compute a single threaded finalizeStore is
- called (so little extra memory + 1 sequential finish ops)
- - used striped parallel fast utils instead (unsure of perf)
- - use concurrent map (every get gets far slower)
- - use reader writer locks (unsure of perf)
- (code looks something like underneath)
-
- private final ReadWriteLock rwl = new ReentrantReadWriteLock();
- rwl.readLock().lock();
- L list = partitionMap.get(vertexId);
- if (list == null) {
- rwl.readLock().unlock();
- rwl.writeLock().lock();
- if (partitionMap.get(vertexId) == null) {
- list = createList();
- partitionMap.put(vertexId, list);
- }
- rwl.readLock().lock();
- rwl.writeLock().unlock();
- }
- rwl.readLock().unlock();
- - adopted from the article
- http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
- ReentrantReadWriteLock.html
- */
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
new file mode 100644
index 0000000..aee6a61
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractListStore.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.partition.Partition;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <L> list type
+ */
+public abstract class LongAbstractListStore<M extends Writable,
+ L extends List> extends LongAbstractStore<M, L> {
+ /**
+ * Map used to store messages for nascent vertices i.e., ones
+ * that did not exist at the start of current superstep but will get
+ * created because of sending message to a non-existent vertex id
+ */
+ private final
+ Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<L>> nascentMap;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param partitionInfo Partition split info
+ * @param config Hadoop configuration
+ */
+ public LongAbstractListStore(
+ MessageValueFactory<M> messageValueFactory,
+ PartitionSplitInfo<LongWritable> partitionInfo,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ Writable, Writable> config) {
+ super(messageValueFactory, partitionInfo, config);
+ populateMap();
+
+ // create map for vertex ids (i.e., nascent vertices) not known yet
+ nascentMap = new Int2ObjectOpenHashMap<>();
+ for (int partitionId : partitionInfo.getPartitionIds()) {
+ nascentMap.put(partitionId, new Long2ObjectOpenHashMap<L>());
+ }
+ }
+
+ /**
+ * Populate the map with all vertexIds for each partition
+ */
+ private void populateMap() { // TODO - can parallelize?
+ // populate with vertex ids already known
+ partitionInfo.startIteration();
+ while (true) {
+ Partition partition = partitionInfo.getNextPartition();
+ if (partition == null) {
+ break;
+ }
+ Long2ObjectOpenHashMap<L> partitionMap = map.get(partition.getId());
+ for (Object obj : partition) {
+ Vertex vertex = (Vertex) obj;
+ LongWritable vertexId = (LongWritable) vertex.getId();
+ partitionMap.put(vertexId.get(), createList());
+ }
+ partitionInfo.putPartition(partition);
+ }
+ }
+
+ /**
+ * Create an instance of L
+ * @return instance of L
+ */
+ protected abstract L createList();
+
+ /**
+ * Get list for the current vertexId
+ *
+ * @param vertexId vertex id
+ * @return list for current vertexId
+ */
+ protected L getList(LongWritable vertexId) {
+ long id = vertexId.get();
+ int partitionId = partitionInfo.getPartitionId(vertexId);
+ Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+ L list = partitionMap.get(id);
+ if (list == null) {
+ Long2ObjectOpenHashMap<L> nascentPartitionMap =
+ nascentMap.get(partitionId);
+ // assumption: not many nascent vertices are created
+ // so overall synchronization is negligible
+ synchronized (nascentPartitionMap) {
+ list = nascentPartitionMap.get(id);
+ if (list == null) {
+ list = createList();
+ nascentPartitionMap.put(id, list);
+ }
+ return list;
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public void finalizeStore() {
+ for (int partitionId : nascentMap.keySet()) {
+ // nascent vertices are present only in nascent map
+ map.get(partitionId).putAll(nascentMap.get(partitionId));
+ }
+ nascentMap.clear();
+ }
+
+ @Override
+ public boolean hasMessagesForVertex(LongWritable vertexId) {
+ int partitionId = partitionInfo.getPartitionId(vertexId);
+ Long2ObjectOpenHashMap<L> partitionMap = map.get(partitionId);
+ L list = partitionMap.get(vertexId.get());
+ if (list != null && !list.isEmpty()) {
+ return true;
+ }
+ Long2ObjectOpenHashMap<L> nascentMessages = nascentMap.get(partitionId);
+ return nascentMessages != null &&
+ nascentMessages.containsKey(vertexId.get());
+ }
+
+ // TODO - discussion
+ /*
+ some approaches for ensuring correctness with parallel inserts
+ - current approach: uses a small extra bit of memory by pre-populating
+ map & pushes everything map cannot handle to nascentMap
+ at the beginning of next superstep compute a single threaded finalizeStore is
+ called (so little extra memory + 1 sequential finish ops)
+ - used striped parallel fast utils instead (unsure of perf)
+ - use concurrent map (every get gets far slower)
+ - use reader writer locks (unsure of perf)
+ (code looks something like underneath)
+
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock();
+ rwl.readLock().lock();
+ L list = partitionMap.get(vertexId);
+ if (list == null) {
+ rwl.readLock().unlock();
+ rwl.writeLock().lock();
+ if (partitionMap.get(vertexId) == null) {
+ list = createList();
+ partitionMap.put(vertexId, list);
+ }
+ rwl.readLock().lock();
+ rwl.writeLock().unlock();
+ }
+ rwl.readLock().unlock();
+ - adopted from the article
+ http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/locks/\
+ ReentrantReadWriteLock.html
+ */
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
deleted file mode 100644
index b3ed4b2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractMessageStore.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import com.google.common.collect.Lists;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.longs.LongIterator;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.util.List;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> message type
- * @param <T> datastructure used to hold messages
- */
-public abstract class LongAbstractMessageStore<M extends Writable, T>
- implements MessageStore<LongWritable, M> {
- /** Message value factory */
- protected final MessageValueFactory<M> messageValueFactory;
- /** Map from partition id to map from vertex id to message */
- protected final
- Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
- /** Service worker */
- protected final CentralizedServiceWorker<LongWritable, ?, ?> service;
- /** Giraph configuration */
- protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
- config;
-
- /**
- * Constructor
- *
- * @param messageValueFactory Factory for creating message values
- * @param service Service worker
- * @param config Hadoop configuration
- */
- public LongAbstractMessageStore(
- MessageValueFactory<M> messageValueFactory,
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
- config) {
- this.messageValueFactory = messageValueFactory;
- this.service = service;
- this.config = config;
-
- map = new Int2ObjectOpenHashMap<>();
- for (int partitionId : service.getPartitionStore().getPartitionIds()) {
- Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
- (int) service.getPartitionStore()
- .getPartitionVertexCount(partitionId));
- map.put(partitionId, partitionMap);
- }
- }
-
- /**
- * Get map which holds messages for partition which vertex belongs to.
- *
- * @param vertexId Id of the vertex
- * @return Map which holds messages for partition which vertex belongs to.
- */
- protected Long2ObjectOpenHashMap<T> getPartitionMap(
- LongWritable vertexId) {
- return map.get(service.getPartitionId(vertexId));
- }
-
- @Override
- public void clearPartition(int partitionId) {
- map.get(partitionId).clear();
- }
-
- @Override
- public boolean hasMessagesForVertex(LongWritable vertexId) {
- return getPartitionMap(vertexId).containsKey(vertexId.get());
- }
-
- @Override
- public boolean hasMessagesForPartition(int partitionId) {
- Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
- return partitionMessages != null && !partitionMessages.isEmpty();
- }
-
- @Override
- public void clearVertexMessages(LongWritable vertexId) {
- getPartitionMap(vertexId).remove(vertexId.get());
- }
-
-
- @Override
- public void clearAll() {
- map.clear();
- }
-
- @Override
- public Iterable<LongWritable> getPartitionDestinationVertices(
- int partitionId) {
- Long2ObjectOpenHashMap<T> partitionMap =
- map.get(partitionId);
- List<LongWritable> vertices =
- Lists.newArrayListWithCapacity(partitionMap.size());
- LongIterator iterator = partitionMap.keySet().iterator();
- while (iterator.hasNext()) {
- vertices.add(new LongWritable(iterator.nextLong()));
- }
- return vertices;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
new file mode 100644
index 0000000..385388d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongAbstractStore.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import com.google.common.collect.Lists;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
+import it.unimi.dsi.fastutil.longs.LongIterator;
+
+import java.util.List;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Special message store to be used when ids are LongWritable and no combiner
+ * is used.
+ * Uses fastutil primitive maps in order to decrease number of objects and
+ * get better performance.
+ *
+ * @param <M> message type
+ * @param <T> datastructure used to hold messages
+ */
+public abstract class LongAbstractStore<M extends Writable, T>
+ implements MessageStore<LongWritable, M> {
+ /** Message value factory */
+ protected final MessageValueFactory<M> messageValueFactory;
+ /** Map from partition id to map from vertex id to message */
+ protected final
+ Int2ObjectOpenHashMap<Long2ObjectOpenHashMap<T>> map;
+ /** Service worker */
+ protected final PartitionSplitInfo<LongWritable> partitionInfo;
+ /** Giraph configuration */
+ protected final ImmutableClassesGiraphConfiguration<LongWritable, ?, ?>
+ config;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param partitionInfo Partition split info
+ * @param config Hadoop configuration
+ */
+ public LongAbstractStore(
+ MessageValueFactory<M> messageValueFactory,
+ PartitionSplitInfo<LongWritable> partitionInfo,
+ ImmutableClassesGiraphConfiguration<LongWritable, Writable, Writable>
+ config) {
+ this.messageValueFactory = messageValueFactory;
+ this.partitionInfo = partitionInfo;
+ this.config = config;
+
+ map = new Int2ObjectOpenHashMap<>();
+ for (int partitionId : partitionInfo.getPartitionIds()) {
+ Long2ObjectOpenHashMap<T> partitionMap = new Long2ObjectOpenHashMap<T>(
+ (int) partitionInfo.getPartitionVertexCount(partitionId));
+ map.put(partitionId, partitionMap);
+ }
+ }
+
+ /**
+ * Get map which holds messages for partition which vertex belongs to.
+ *
+ * @param vertexId Id of the vertex
+ * @return Map which holds messages for partition which vertex belongs to.
+ */
+ protected Long2ObjectOpenHashMap<T> getPartitionMap(
+ LongWritable vertexId) {
+ return map.get(partitionInfo.getPartitionId(vertexId));
+ }
+
+ @Override
+ public void clearPartition(int partitionId) {
+ map.get(partitionId).clear();
+ }
+
+ @Override
+ public boolean hasMessagesForVertex(LongWritable vertexId) {
+ return getPartitionMap(vertexId).containsKey(vertexId.get());
+ }
+
+ @Override
+ public boolean hasMessagesForPartition(int partitionId) {
+ Long2ObjectOpenHashMap<T> partitionMessages = map.get(partitionId);
+ return partitionMessages != null && !partitionMessages.isEmpty();
+ }
+
+ @Override
+ public void clearVertexMessages(LongWritable vertexId) {
+ getPartitionMap(vertexId).remove(vertexId.get());
+ }
+
+
+ @Override
+ public void clearAll() {
+ map.clear();
+ }
+
+ @Override
+ public Iterable<LongWritable> getPartitionDestinationVertices(
+ int partitionId) {
+ Long2ObjectOpenHashMap<T> partitionMap =
+ map.get(partitionId);
+ List<LongWritable> vertices =
+ Lists.newArrayListWithCapacity(partitionMap.size());
+ LongIterator iterator = partitionMap.keySet().iterator();
+ while (iterator.hasNext()) {
+ vertices.add(new LongWritable(iterator.nextLong()));
+ }
+ return vertices;
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
deleted file mode 100644
index bcdab98..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongByteArrayMessageStore.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.VertexIdMessageBytesIterator;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VerboseByteStructMessageWrite;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.io.DataInputOutput;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
-import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
-import it.unimi.dsi.fastutil.objects.ObjectIterator;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Special message store to be used when ids are LongWritable and no combiner
- * is used.
- * Uses fastutil primitive maps in order to decrease number of objects and
- * get better performance.
- *
- * @param <M> Message type
- */
-public class LongByteArrayMessageStore<M extends Writable>
- extends LongAbstractMessageStore<M, DataInputOutput> {
-
- /**
- * Constructor
- *
- * @param messageValueFactory Factory for creating message values
- * @param service Service worker
- * @param config Hadoop configuration
- */
- public LongByteArrayMessageStore(
- MessageValueFactory<M> messageValueFactory,
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- ImmutableClassesGiraphConfiguration<LongWritable,
- Writable, Writable> config) {
- super(messageValueFactory, service, config);
- }
-
- @Override
- public boolean isPointerListEncoding() {
- return false;
- }
-
- /**
- * Get the DataInputOutput for a vertex id, creating if necessary.
- *
- * @param partitionMap Partition map to look in
- * @param vertexId Id of the vertex
- * @return DataInputOutput for this vertex id (created if necessary)
- */
- private DataInputOutput getDataInputOutput(
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap, long vertexId) {
- DataInputOutput dataInputOutput = partitionMap.get(vertexId);
- if (dataInputOutput == null) {
- dataInputOutput = config.createMessagesInputOutput();
- partitionMap.put(vertexId, dataInputOutput);
- }
- return dataInputOutput;
- }
-
- @Override
- public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, M> messages) {
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap = map.get(partitionId);
- synchronized (partitionMap) {
- VertexIdMessageBytesIterator<LongWritable, M>
- vertexIdMessageBytesIterator =
- messages.getVertexIdMessageBytesIterator();
- // Try to copy the message buffer over rather than
- // doing a deserialization of a message just to know its size. This
- // should be more efficient for complex objects where serialization is
- // expensive. If this type of iterator is not available, fall back to
- // deserializing/serializing the messages
- if (vertexIdMessageBytesIterator != null) {
- while (vertexIdMessageBytesIterator.hasNext()) {
- vertexIdMessageBytesIterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- vertexIdMessageBytesIterator.getCurrentVertexId().get());
- vertexIdMessageBytesIterator.writeCurrentMessageBytes(
- dataInputOutput.getDataOutput());
- }
- } else {
- try {
- VertexIdMessageIterator<LongWritable, M>
- iterator = messages.getVertexIdMessageIterator();
- while (iterator.hasNext()) {
- iterator.next();
- DataInputOutput dataInputOutput = getDataInputOutput(partitionMap,
- iterator.getCurrentVertexId().get());
- VerboseByteStructMessageWrite.verboseWriteCurrentMessage(iterator,
- dataInputOutput.getDataOutput());
- }
- } catch (IOException e) {
- throw new RuntimeException("addPartitionMessages: IOException while" +
- " adding messages for a partition: " + e);
- }
- }
- }
- }
-
- @Override
- public void finalizeStore() {
- }
-
- @Override
- public Iterable<M> getVertexMessages(
- LongWritable vertexId) {
- DataInputOutput dataInputOutput =
- getPartitionMap(vertexId).get(vertexId.get());
- if (dataInputOutput == null) {
- return EmptyIterable.get();
- } else {
- return new MessagesIterable<M>(dataInputOutput, messageValueFactory);
- }
- }
-
- @Override
- public void writePartition(DataOutput out, int partitionId)
- throws IOException {
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- map.get(partitionId);
- out.writeInt(partitionMap.size());
- ObjectIterator<Long2ObjectMap.Entry<DataInputOutput>> iterator =
- partitionMap.long2ObjectEntrySet().fastIterator();
- while (iterator.hasNext()) {
- Long2ObjectMap.Entry<DataInputOutput> entry = iterator.next();
- out.writeLong(entry.getLongKey());
- entry.getValue().write(out);
- }
- }
-
- @Override
- public void readFieldsForPartition(DataInput in,
- int partitionId) throws IOException {
- int size = in.readInt();
- Long2ObjectOpenHashMap<DataInputOutput> partitionMap =
- new Long2ObjectOpenHashMap<DataInputOutput>(size);
- while (size-- > 0) {
- long vertexId = in.readLong();
- DataInputOutput dataInputOutput = config.createMessagesInputOutput();
- dataInputOutput.readFields(in);
- partitionMap.put(vertexId, dataInputOutput);
- }
- synchronized (map) {
- map.put(partitionId, partitionMap);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
deleted file mode 100644
index eef75ba..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListMessageStore.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.comm.messages.primitives.long_id;
-
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.comm.messages.PointerListMessagesIterable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.factories.MessageValueFactory;
-import org.apache.giraph.utils.EmptyIterable;
-import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.VertexIdMessageIterator;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Writable;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import static org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
-
-/**
- * This stores messages in
- * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
- * and stores long pointers that point to serialized messages
- *
- * @param <M> message type
- */
-public class LongPointerListMessageStore<M extends Writable>
- extends LongAbstractListMessageStore<M, LongArrayList>
- implements MessageStore<LongWritable, M> {
-
- /** Buffers of byte array outputs used to store messages - thread safe */
- private final ExtendedByteArrayOutputBuffer bytesBuffer;
-
- /**
- * Constructor
- *
- * @param messageValueFactory Factory for creating message values
- * @param service Service worker
- * @param config Hadoop configuration
- */
- public LongPointerListMessageStore(
- MessageValueFactory<M> messageValueFactory,
- CentralizedServiceWorker<LongWritable, Writable, Writable> service,
- ImmutableClassesGiraphConfiguration<LongWritable,
- Writable, Writable> config) {
- super(messageValueFactory, service, config);
- bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
- }
-
- @Override
- public boolean isPointerListEncoding() {
- return true;
- }
-
- @Override
- protected LongArrayList createList() {
- return new LongArrayList();
- }
-
- @Override
- public void addPartitionMessages(int partitionId,
- VertexIdMessages<LongWritable, M> messages) {
- try {
- VertexIdMessageIterator<LongWritable, M> iterator =
- messages.getVertexIdMessageIterator();
- long pointer = 0;
- LongArrayList list;
- while (iterator.hasNext()) {
- iterator.next();
- M msg = iterator.getCurrentMessage();
- list = getList(iterator);
- if (iterator.isNewMessage()) {
- IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
- pointer = indexAndDataOut.getIndex();
- pointer <<= 32;
- ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
- pointer += dataOutput.getPos();
- msg.write(dataOutput);
- }
- synchronized (list) { // TODO - any better way?
- list.add(pointer);
- }
- }
- } catch (IOException e) {
- throw new RuntimeException("addPartitionMessages: IOException while" +
- " adding messages for a partition: " + e);
- }
- }
-
- @Override
- public Iterable<M> getVertexMessages(
- LongWritable vertexId) {
- LongArrayList list = getPartitionMap(vertexId).get(
- vertexId.get());
- if (list == null) {
- return EmptyIterable.get();
- } else {
- return new PointerListMessagesIterable<>(messageValueFactory,
- list, bytesBuffer);
- }
- }
-
- // FIXME -- complete these for check-pointing
- @Override
- public void writePartition(DataOutput out, int partitionId)
- throws IOException {
- }
-
- @Override
- public void readFieldsForPartition(DataInput in, int partitionId)
- throws IOException {
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
new file mode 100644
index 0000000..525225c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/primitives/long_id/LongPointerListPerVertexStore.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.messages.primitives.long_id;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.comm.messages.PartitionSplitInfo;
+import org.apache.giraph.comm.messages.PointerListMessagesIterable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.factories.MessageValueFactory;
+import org.apache.giraph.utils.EmptyIterable;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer;
+import org.apache.giraph.utils.ExtendedByteArrayOutputBuffer.IndexAndDataOut;
+import org.apache.giraph.utils.ExtendedDataOutput;
+import org.apache.giraph.utils.VertexIdMessageIterator;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * This stores messages in
+ * {@link org.apache.giraph.utils.ExtendedByteArrayOutputBuffer}
+ * and stores long pointers that point to serialized messages
+ *
+ * @param <M> message type
+ */
+public class LongPointerListPerVertexStore<M extends Writable>
+ extends LongAbstractListStore<M, LongArrayList>
+ implements MessageStore<LongWritable, M> {
+
+ /** Buffers of byte array outputs used to store messages - thread safe */
+ private final ExtendedByteArrayOutputBuffer bytesBuffer;
+
+ /**
+ * Constructor
+ *
+ * @param messageValueFactory Factory for creating message values
+ * @param partitionInfo Partition split info
+ * @param config Hadoop configuration
+ */
+ public LongPointerListPerVertexStore(
+ MessageValueFactory<M> messageValueFactory,
+ PartitionSplitInfo<LongWritable> partitionInfo,
+ ImmutableClassesGiraphConfiguration<LongWritable,
+ Writable, Writable> config) {
+ super(messageValueFactory, partitionInfo, config);
+ bytesBuffer = new ExtendedByteArrayOutputBuffer(config);
+ }
+
+ @Override
+ public boolean isPointerListEncoding() {
+ return true;
+ }
+
+ @Override
+ protected LongArrayList createList() {
+ return new LongArrayList();
+ }
+
+ @Override
+ public void addPartitionMessages(
+ int partitionId,
+ VertexIdMessages<LongWritable, M> messages
+ ) {
+ try {
+ VertexIdMessageIterator<LongWritable, M> iterator =
+ messages.getVertexIdMessageIterator();
+ long pointer = 0;
+ LongArrayList list;
+ while (iterator.hasNext()) {
+ iterator.next();
+ M msg = iterator.getCurrentMessage();
+ list = getList(iterator.getCurrentVertexId());
+
+ if (iterator.isNewMessage()) {
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ msg.write(dataOutput);
+ }
+ synchronized (list) { // TODO - any better way?
+ list.add(pointer);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("addPartitionMessages: IOException while" +
+ " adding messages for a partition: " + e);
+ }
+ }
+
+ @Override
+ public void addMessage(LongWritable vertexId, M message) throws IOException {
+ LongArrayList list = getList(vertexId);
+ IndexAndDataOut indexAndDataOut = bytesBuffer.getIndexAndDataOut();
+ long pointer = indexAndDataOut.getIndex();
+ pointer <<= 32;
+ ExtendedDataOutput dataOutput = indexAndDataOut.getDataOutput();
+ pointer += dataOutput.getPos();
+ message.write(dataOutput);
+
+ synchronized (list) {
+ list.add(pointer);
+ }
+ }
+
+ @Override
+ public Iterable<M> getVertexMessages(LongWritable vertexId) {
+ LongArrayList list = getPartitionMap(vertexId).get(
+ vertexId.get());
+ if (list == null) {
+ return EmptyIterable.get();
+ } else {
+ return new PointerListMessagesIterable<>(messageValueFactory,
+ list, bytesBuffer);
+ }
+ }
+
+ // FIXME -- complete these for check-pointing
+ @Override
+ public void writePartition(DataOutput out, int partitionId)
+ throws IOException {
+ }
+
+ @Override
+ public void readFieldsForPartition(DataInput in, int partitionId)
+ throws IOException {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
index 6273694..e8f00ec 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapper.java
@@ -19,12 +19,6 @@ package org.apache.giraph.comm.messages.queue;
import it.unimi.dsi.fastutil.ints.Int2IntArrayMap;
import it.unimi.dsi.fastutil.ints.Int2IntMap;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.utils.ThreadUtils;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
@@ -35,6 +29,13 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
+import org.apache.giraph.comm.messages.MessageStore;
+import org.apache.giraph.utils.ThreadUtils;
+import org.apache.giraph.utils.VertexIdMessages;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
/**
* This class decouples message receiving and processing
* into separate threads thus reducing contention.
@@ -157,6 +158,12 @@ public final class AsyncMessageStoreWrapper<I extends WritableComparable,
}
@Override
+ public void addMessage(I vertexId, M message) throws IOException {
+ // TODO: implement if LocalBlockRunner needs async message store
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public void finalizeStore() {
store.finalizeStore();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
index c8d0f79..de67af4 100644
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/data/DiskBackedMessageStore.java
@@ -18,6 +18,10 @@
package org.apache.giraph.ooc.data;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.factories.MessageValueFactory;
@@ -32,9 +36,6 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
/**
* Implementation of a message store used for out-of-core mechanism.
@@ -137,6 +138,26 @@ public class DiskBackedMessageStore<I extends WritableComparable,
}
}
+ @Override
+ public void addMessage(I vertexId, M message) throws IOException {
+ if (useMessageCombiner) {
+ messageStore.addMessage(vertexId, message);
+ } else {
+ // TODO: implement if LocalBlockRunner needs this message store
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ /**
+ * Gets the path that should be used specifically for message data.
+ *
+ * @param basePath path prefix to build the actual path from
+ * @param superstep superstep for which message data should be stored
+ * @return path to files specific for message data
+ */
+ private static String getPath(String basePath, long superstep) {
+ return basePath + "_messages-S" + superstep;
+ }
@Override
public long loadPartitionData(int partitionId)
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
index 299dced..8f00293 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/Basic2ObjectMap.java
@@ -23,23 +23,29 @@ import it.unimi.dsi.fastutil.ints.IntIterator;
import it.unimi.dsi.fastutil.longs.Long2ObjectMap;
import it.unimi.dsi.fastutil.longs.Long2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Collection;
import java.util.Iterator;
import org.apache.giraph.types.ops.IntTypeOps;
import org.apache.giraph.types.ops.LongTypeOps;
import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.utils.WritableUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
+import com.google.common.base.Preconditions;
+
/**
* Basic2ObjectMap with only basic set of operations.
- * All operations that return object T are returning reusable object,
+ * All operations that return object K are returning reusable object,
* which is modified after calling any other function.
*
* @param <K> Key type
@@ -81,13 +87,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
* @return the old value, or null if no value was present for the given key.
*/
public abstract V remove(K key);
-
/**
* TypeOps for type of keys this object holds
* @return TypeOps
*/
public abstract PrimitiveIdTypeOps<K> getKeyTypeOps();
-
/**
* Fast iterator over keys within this map, which doesn't allocate new
* element for each returned element.
@@ -100,6 +104,20 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
public abstract Iterator<K> fastKeyIterator();
/**
+ * Iterator over map values.
+ *
+ * @return Iterator
+ */
+ public abstract Iterator<V> valueIterator();
+
+ /**
+ * A collection of all values.
+ *
+ * @return Iterator
+ */
+ public abstract Collection<V> values();
+
+ /**
* Iterator that reuses key object.
*
* @param <Iter> Primitive key iterator type
@@ -212,6 +230,16 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
}
@Override
+ public Iterator<V> valueIterator() {
+ return map.values().iterator();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
out.writeInt(map.size());
ObjectIterator<Int2ObjectMap.Entry<V>> iterator =
@@ -319,7 +347,22 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
}
@Override
+ public Iterator<V> valueIterator() {
+ return map.values().iterator();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
+ Preconditions.checkState(
+ valueWriter != null,
+ "valueWriter is not provided"
+ );
+
out.writeInt(map.size());
ObjectIterator<Long2ObjectMap.Entry<V>> iterator =
map.long2ObjectEntrySet().fastIterator();
@@ -332,6 +375,11 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
@Override
public void readFields(DataInput in) throws IOException {
+ Preconditions.checkState(
+ valueWriter != null,
+ "valueWriter is not provided"
+ );
+
int size = in.readInt();
map.clear();
map.trim(size);
@@ -342,4 +390,141 @@ public abstract class Basic2ObjectMap<K, V> implements Writable {
}
}
}
+
+ /** Writable implementation of Basic2ObjectMap */
+ public static final class BasicObject2ObjectOpenHashMap<K extends Writable, V>
+ extends Basic2ObjectMap<K, V> {
+ /** Map */
+ private final Object2ObjectOpenHashMap<K, V> map;
+ /** Key writer */
+ private final WritableWriter<K> keyWriter;
+ /** Value writer */
+ private final WritableWriter<V> valueWriter;
+
+ /**
+ * Constructor
+ *
+ * @param keyWriter Writer of keys
+ * @param valueWriter Writer of values
+ */
+ public BasicObject2ObjectOpenHashMap(
+ WritableWriter<K> keyWriter,
+ WritableWriter<V> valueWriter
+ ) {
+ this.map = new Object2ObjectOpenHashMap<>();
+ this.keyWriter = keyWriter;
+ this.valueWriter = valueWriter;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param capacity Map capacity
+ * @param keyWriter Writer of keys
+ * @param valueWriter Writer of values
+ */
+ public BasicObject2ObjectOpenHashMap(
+ int capacity,
+ WritableWriter<K> keyWriter,
+ WritableWriter<V> valueWriter
+ ) {
+ this.map = new Object2ObjectOpenHashMap<>(capacity);
+ this.keyWriter = keyWriter;
+ this.valueWriter = valueWriter;
+ }
+
+ @Override
+ public void clear() {
+ map.clear();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return map.containsKey(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ // we need a copy since the key object is mutable
+ K copyKey = WritableUtils.createCopy(key);
+ return map.put(copyKey, value);
+ }
+
+ @Override
+ public V get(K key) {
+ return map.get(key);
+ }
+
+ @Override
+ public V remove(K key) {
+ return map.remove(key);
+ }
+
+ @Override
+ public PrimitiveIdTypeOps<K> getKeyTypeOps() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<K> fastKeyIterator() {
+ return map.keySet().iterator();
+ }
+
+ @Override
+ public Iterator<V> valueIterator() {
+ return map.values().iterator();
+ }
+
+ @Override
+ public Collection<V> values() {
+ return map.values();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Preconditions.checkState(
+ keyWriter != null,
+ "keyWriter is not provided"
+ );
+ Preconditions.checkState(
+ valueWriter != null,
+ "valueWriter is not provided"
+ );
+
+ out.writeInt(map.size());
+ ObjectIterator<Object2ObjectMap.Entry<K, V>> iterator =
+ map.object2ObjectEntrySet().fastIterator();
+ while (iterator.hasNext()) {
+ Object2ObjectMap.Entry<K, V> entry = iterator.next();
+ keyWriter.write(out, entry.getKey());
+ valueWriter.write(out, entry.getValue());
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ Preconditions.checkState(
+ keyWriter != null,
+ "keyWriter is not provided"
+ );
+ Preconditions.checkState(
+ valueWriter != null,
+ "valueWriter is not provided"
+ );
+
+ int size = in.readInt();
+ map.clear();
+ map.trim(size);
+ while (size-- > 0) {
+ K key = keyWriter.readFields(in);
+ V value = valueWriter.readFields(in);
+ map.put(key, value);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
new file mode 100644
index 0000000..23df7d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicCollectionsUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types.ops.collections;
+
+import org.apache.giraph.types.ops.PrimitiveIdTypeOps;
+import org.apache.giraph.types.ops.TypeOpsUtils;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap.BasicObject2ObjectOpenHashMap;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Utility functions for constructing basic collections
+ */
+public class BasicCollectionsUtils {
+ /** No instances */
+ private BasicCollectionsUtils() { }
+
+ /**
+ * Construct OpenHashMap with primitive keys.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Value type
+ * @param idClass Class type
+ * @return map
+ */
+ public static <I extends Writable, V>
+ Basic2ObjectMap<I, V> create2ObjectMap(Class<I> idClass) {
+ return create2ObjectMap(idClass, null, null);
+ }
+
+ /**
+ * Construct OpenHashMap with primitive keys.
+ *
+ * If keyWriter/valueWriter are not provided,
+ * readFields/write will throw an Exception, if called.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Value type
+ * @param idClass Class type
+ * @param keyWriter writer for keys
+ * @param valueWriter writer for values
+ * @return map
+ */
+ public static <I extends Writable, V>
+ Basic2ObjectMap<I, V> create2ObjectMap(
+ Class<I> idClass,
+ WritableWriter<I> keyWriter,
+ WritableWriter<V> valueWriter
+ ) {
+ PrimitiveIdTypeOps<I> idTypeOps = TypeOpsUtils.getPrimitiveIdTypeOpsOrNull(
+ idClass
+ );
+ if (idTypeOps != null) {
+ return idTypeOps.create2ObjectOpenHashMap(valueWriter);
+ } else {
+ return new BasicObject2ObjectOpenHashMap<>(keyWriter, valueWriter);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
index 2865a53..46f7b48 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/TestGraph.java
@@ -18,7 +18,7 @@
package org.apache.giraph.utils;
-import java.util.HashMap;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
@@ -29,12 +29,13 @@ import org.apache.giraph.edge.Edge;
import org.apache.giraph.edge.EdgeFactory;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.types.ops.collections.Basic2ObjectMap;
+import org.apache.giraph.types.ops.collections.BasicCollectionsUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import com.google.common.base.Objects;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* TestGraph class for in-memory testing.
@@ -50,7 +51,7 @@ public class TestGraph<I extends WritableComparable,
/** Vertex value combiner */
protected final VertexValueCombiner<V> vertexValueCombiner;
/** The vertex values */
- protected HashMap<I, Vertex<I, V, E>> vertices = Maps.newHashMap();
+ protected Basic2ObjectMap<I, Vertex<I, V, E>> vertices;
/** The configuration */
protected ImmutableClassesGiraphConfiguration<I, V, E> conf;
@@ -60,12 +61,19 @@ public class TestGraph<I extends WritableComparable,
* @param conf Should have vertex and edge classes set.
*/
public TestGraph(GiraphConfiguration conf) {
- this.conf = new ImmutableClassesGiraphConfiguration(conf);
+ this.conf = new ImmutableClassesGiraphConfiguration<>(conf);
vertexValueCombiner = this.conf.createVertexValueCombiner();
+ vertices = BasicCollectionsUtils.create2ObjectMap(
+ this.conf.getVertexIdClass()
+ );
}
- public HashMap<I, Vertex<I, V, E>> getVertices() {
- return vertices;
+ public Collection<Vertex<I, V, E>> getVertices() {
+ return vertices.values();
+ }
+
+ public int getVertexCount() {
+ return vertices.size();
}
public ImmutableClassesGiraphConfiguration<I, V, E> getConf() {
@@ -108,7 +116,7 @@ public class TestGraph<I extends WritableComparable,
* @return this
*/
public TestGraph<I, V, E> addVertex(I id, V value,
- Entry<I, E>... edges) {
+ Entry<I, E>... edges) {
addVertex(makeVertex(id, value, edges));
return this;
}
@@ -174,14 +182,6 @@ public class TestGraph<I extends WritableComparable,
.addEdge(EdgeFactory.create(toVertex, edgeValue));
return this;
}
- /**
- * An iterator over the ids
- *
- * @return the iterator
- */
- public Iterator<I> idIterator() {
- return vertices.keySet().iterator();
- }
/**
* An iterator over the vertices
@@ -190,7 +190,7 @@ public class TestGraph<I extends WritableComparable,
*/
@Override
public Iterator<Vertex<I, V, E>> iterator() {
- return vertices.values().iterator();
+ return vertices.valueIterator();
}
/**
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
index aa25490..c3c43fb 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/VerboseByteStructMessageWrite.java
@@ -18,12 +18,12 @@
package org.apache.giraph.utils;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataOutput;
import java.io.IOException;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
/** Verbose Error mesage for ByteArray based messages */
public class VerboseByteStructMessageWrite {
/**
@@ -43,13 +43,37 @@ public class VerboseByteStructMessageWrite {
* @throws IOException
* @throws RuntimeException
*/
- public static <I extends WritableComparable, M extends Writable> void
- verboseWriteCurrentMessage(VertexIdMessageIterator<I, M> iterator,
- DataOutput out) throws IOException {
+ public static <I extends WritableComparable, M extends Writable>
+ void verboseWriteCurrentMessage(
+ VertexIdMessageIterator<I, M> iterator,
+ DataOutput out
+ ) throws IOException {
+ verboseWriteCurrentMessage(
+ iterator.getCurrentVertexId(), iterator.getCurrentMessage(), out);
+ }
+
+ /**
+ * verboseWriteCurrentMessage
+ * de-serialize, then write messages
+ *
+ * @param vertexId vertexId
+ * @param message message
+ * @param out DataOutput
+ * @param <I> vertexId
+ * @param <M> message
+ * @throws IOException
+ * @throws RuntimeException
+ */
+ public static <I extends WritableComparable, M extends Writable>
+ void verboseWriteCurrentMessage(
+ I vertexId,
+ M message,
+ DataOutput out
+ ) throws IOException {
try {
- iterator.getCurrentMessage().write(out);
+ message.write(out);
} catch (NegativeArraySizeException e) {
- handleNegativeArraySize(iterator.getCurrentVertexId());
+ handleNegativeArraySize(vertexId);
}
}
@@ -59,8 +83,8 @@ public class VerboseByteStructMessageWrite {
* @param vertexId vertexId
* @param <I> vertexId type
*/
- public static <I extends WritableComparable> void handleNegativeArraySize(
- I vertexId) {
+ public static <I extends WritableComparable>
+ void handleNegativeArraySize(I vertexId) {
throw new RuntimeException("The numbers of bytes sent to vertex " +
vertexId + " exceeded the max capacity of " +
"its ExtendedDataOutput. Please consider setting " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index cdb9b7e..c51521d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -183,7 +183,7 @@ public class BspServiceWorker<I extends WritableComparable,
private GiraphTimer waitRequestsTimer;
/** InputSplit handlers used in INPUT_SUPERSTEP */
- private WorkerInputSplitsHandler inputSplitsHandler;
+ private final WorkerInputSplitsHandler inputSplitsHandler;
/** Memory observer */
private final MemoryObserver memoryObserver;
@@ -1784,6 +1784,31 @@ else[HADOOP_NON_SECURE]*/
}
@Override
+ public Iterable<Integer> getPartitionIds() {
+ return getPartitionStore().getPartitionIds();
+ }
+
+ @Override
+ public long getPartitionVertexCount(Integer partitionId) {
+ return getPartitionStore().getPartitionVertexCount(partitionId);
+ }
+
+ @Override
+ public void startIteration() {
+ getPartitionStore().startIteration();
+ }
+
+ @Override
+ public Partition getNextPartition() {
+ return getPartitionStore().getNextPartition();
+ }
+
+ @Override
+ public void putPartition(Partition partition) {
+ getPartitionStore().putPartition(partition);
+ }
+
+ @Override
public ServerData<I, V, E> getServerData() {
return workerServer.getServerData();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
index e3b2db0..2c5f2db 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestIntFloatPrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.FloatSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.IntByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.IntFloatMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
@@ -71,8 +71,10 @@ public class TestIntFloatPrimitiveMessageStores {
);
PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+ Mockito.when(service.getPartitionIds()).thenReturn(
+ Lists.newArrayList(0, 1));
Mockito.when(partitionStore.getPartitionIds()).thenReturn(
- Lists.newArrayList(0, 1));
+ Lists.newArrayList(0, 1));
Partition partition = Mockito.mock(Partition.class);
Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -144,8 +146,8 @@ public class TestIntFloatPrimitiveMessageStores {
@Test
public void testIntByteArrayMessageStore() {
- IntByteArrayMessageStore<FloatWritable> messageStore =
- new IntByteArrayMessageStore<FloatWritable>(new
+ IdByteArrayMessageStore<IntWritable, FloatWritable> messageStore =
+ new IdByteArrayMessageStore<>(new
TestMessageValueFactory<FloatWritable>(FloatWritable.class),
service, conf);
insertIntFloatMessages(messageStore);
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
index dc9850b..5508b2c 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/TestLongDoublePrimitiveMessageStores.java
@@ -25,7 +25,7 @@ import junit.framework.Assert;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.combiner.DoubleSumMessageCombiner;
-import org.apache.giraph.comm.messages.primitives.long_id.LongByteArrayMessageStore;
+import org.apache.giraph.comm.messages.primitives.IdByteArrayMessageStore;
import org.apache.giraph.comm.messages.primitives.LongDoubleMessageStore;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -68,8 +68,10 @@ public class TestLongDoublePrimitiveMessageStores {
);
PartitionStore partitionStore = Mockito.mock(PartitionStore.class);
Mockito.when(service.getPartitionStore()).thenReturn(partitionStore);
+ Mockito.when(service.getPartitionIds()).thenReturn(
+ Lists.newArrayList(0, 1));
Mockito.when(partitionStore.getPartitionIds()).thenReturn(
- Lists.newArrayList(0, 1));
+ Lists.newArrayList(0, 1));
Partition partition = Mockito.mock(Partition.class);
Mockito.when(partition.getVertexCount()).thenReturn(Long.valueOf(1));
Mockito.when(partitionStore.getNextPartition()).thenReturn(partition);
@@ -145,8 +147,8 @@ public class TestLongDoublePrimitiveMessageStores {
@Test
public void testLongByteArrayMessageStore() {
- LongByteArrayMessageStore<DoubleWritable> messageStore =
- new LongByteArrayMessageStore<DoubleWritable>(
+ IdByteArrayMessageStore<LongWritable, DoubleWritable> messageStore =
+ new IdByteArrayMessageStore<>(
new TestMessageValueFactory<DoubleWritable>(DoubleWritable.class),
service, createLongDoubleConf());
insertLongDoubleMessages(messageStore);
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
index ffc1288..1294c88 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/messages/queue/AsyncMessageStoreWrapperTest.java
@@ -18,6 +18,14 @@
package org.apache.giraph.comm.messages.queue;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
import org.apache.giraph.comm.messages.MessageStore;
import org.apache.giraph.factories.TestMessageValueFactory;
import org.apache.giraph.utils.ByteArrayVertexIdMessages;
@@ -26,14 +34,6 @@ import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.junit.Test;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotNull;
-
/**
* Test case for AsyncMessageStoreWrapper
*/
@@ -71,6 +71,10 @@ public class AsyncMessageStoreWrapperTest {
}
@Override
+ public void addMessage(LongWritable vertexId, IntWritable message) throws IOException {
+ }
+
+ @Override
public boolean isPointerListEncoding() {
return false;
}
@@ -124,5 +128,6 @@ public class AsyncMessageStoreWrapperTest {
public void readFieldsForPartition(DataInput in, int partitionId) throws IOException {
}
+
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/2117d1db/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index d56c0fb..954e420 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -56,7 +56,7 @@ public class TestSwitchClasses {
graph.addVertex(id2, new StatusValue());
graph = InternalVertexRunner.runWithInMemoryOutput(conf, graph);
- Assert.assertEquals(2, graph.getVertices().size());
+ Assert.assertEquals(2, graph.getVertexCount());
}
private static void checkVerticesOnFinalSuperstep(