You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ed...@apache.org on 2016/03/15 18:40:36 UTC
[4/5] git commit: updated refs/heads/trunk to fafecee
http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
deleted file mode 100644
index cdafa3f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/ooc/DiskBackedPartitionStore.java
+++ /dev/null
@@ -1,2149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.ooc;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import org.apache.commons.lang3.tuple.MutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.giraph.bsp.CentralizedServiceWorker;
-import org.apache.giraph.comm.messages.MessageStore;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.conf.IntConfOption;
-import org.apache.giraph.edge.OutEdges;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.partition.Partition;
-import org.apache.giraph.partition.PartitionStore;
-import org.apache.giraph.utils.ByteArrayOneMessageToManyIds;
-import org.apache.giraph.utils.ByteArrayVertexIdEdges;
-import org.apache.giraph.utils.ByteArrayVertexIdMessages;
-import org.apache.giraph.utils.ExtendedDataOutput;
-import org.apache.giraph.utils.PairList;
-import org.apache.giraph.utils.VertexIdEdges;
-import org.apache.giraph.utils.VertexIdMessages;
-import org.apache.giraph.utils.VertexIterator;
-import org.apache.giraph.utils.WritableUtils;
-import org.apache.giraph.worker.BspServiceWorker;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.log4j.Logger;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static org.apache.giraph.conf.GiraphConstants.MAX_PARTITIONS_IN_MEMORY;
-import static org.apache.giraph.conf.GiraphConstants.ONE_MB;
-import static org.apache.giraph.conf.GiraphConstants.PARTITIONS_DIRECTORY;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Disk-backed PartitionStore. An instance of this class can be coupled with an
- * out-of-core engine. Out-of-core engine is responsible to determine when to
- * offload and what to offload to disk. The instance of this class handles the
- * interactions with disk.
- *
- * This class provides efficient scheduling mechanism while iterating over
- * partitions. It prefers spilling in-memory processed partitions, but the
- * scheduling can be improved upon further.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- */
-@SuppressWarnings("rawtypes")
-public class DiskBackedPartitionStore<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends PartitionStore<I, V, E> {
- /**
- * Minimum size of a buffer (in bytes) to flush to disk. This is used to
- * decide whether vertex/edge buffers are large enough to flush to disk.
- */
- public static final IntConfOption MINIMUM_BUFFER_SIZE_TO_FLUSH =
- new IntConfOption("giraph.flushBufferSize", 8 * ONE_MB,
- "Minimum size of a buffer (in bytes) to flush to disk. ");
-
- /** Class logger. */
- private static final Logger LOG =
- Logger.getLogger(DiskBackedPartitionStore.class);
-
- /** Cached value for MINIMUM_BUFFER_SIZE_TO_FLUSH */
- private final int minBuffSize;
- /**
- * States the partition can be found in:
- * INIT: the partition has just been created
- * ACTIVE: there is at least one thread who holds a reference to the partition
- * and uses it
- * INACTIVE: the partition is not being used by anyone, but it is in memory
- * IN_TRANSIT: the partition is being transferred to disk, the transfer is
- * not yet complete
- * ON_DISK: the partition resides on disk
- */
- private enum State { INIT, ACTIVE, INACTIVE, IN_TRANSIT, ON_DISK };
-
- /** Hash map containing all the partitions */
- private final ConcurrentMap<Integer, MetaPartition> partitions =
- Maps.newConcurrentMap();
-
- /**
- * Contains partitions that has been processed in the current iteration cycle,
- * and are not in use by any thread. The 'State' of these partitions can only
- * be INACTIVE, IN_TRANSIT, and ON_DISK.
- */
- private final Map<State, Set<Integer>> processedPartitions;
- /**
- * Contains partitions that has *not* been processed in the current iteration
- * cycle. Similar to processedPartitions, 'State' if these partitions can only
- * be INACTIVE, IN_TRANSIT, and ON_DISK.
- */
- private final Map<State, Set<Integer>> unProcessedPartitions;
-
- /**
- * Read/Write lock to avoid interleaving of the process of starting a new
- * iteration cycle and the process of spilling data to disk. This is necessary
- * as starting a new iteration changes the data structure holding data that is
- * being spilled to disk. Spilling of different data can happen at the same
- * time (a read lock used for spilling), and cannot be overlapped with
- * change of data structure holding the data.
- */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
- /** Base path where the partition files are written to */
- private final String[] basePaths;
- /** Used to hash partition Ids */
- private final HashFunction hasher = Hashing.murmur3_32();
- /** Maximum number of partition slots in memory */
- private final AtomicInteger maxPartitionsInMem = new AtomicInteger(-1);
- /** Number of slots used */
- private final AtomicInteger numPartitionsInMem = new AtomicInteger(0);
-
- /** Out-of-core engine */
- private final OutOfCoreEngine oocEngine;
- /** If moving of edges to vertices in INPUT_SUPERSTEP has been started */
- private volatile boolean movingEdges;
- /** Whether the partition store is initialized */
- private volatile AtomicBoolean isInitialized;
- /** Whether the number of partitions are fixed as requested by user */
- private final boolean isNumPartitionsFixed;
-
- /**
- * Map of partition ids to list of input vertex buffers. The map will have an
- * entry only for partitions that are currently out-of-core. We keep the
- * aggregate size of buffers in as part of the values of the map to estimate
- * how much memory would be free if we offload this buffer to disk.
- */
- private final ConcurrentMap<Integer, Pair<Integer, List<ExtendedDataOutput>>>
- pendingInputVertices = Maps.newConcurrentMap();
- /**
- * When a partition is out-of-core, and we also offloaded some of its vertex
- * buffers, we have to keep track of how many buffers we offloaded to disk.
- * This contains this value for out-of-core partitions.
- */
- private final ConcurrentMap<Integer, Integer> numPendingInputVerticesOnDisk =
- Maps.newConcurrentMap();
- /** Lock to avoid overlap of addition and removal on pendingInputVertices */
- private final ReadWriteLock vertexBufferRWLock = new ReentrantReadWriteLock();
-
- /**
- * Similar to vertex buffer, but used for input edges (see comments for
- * pendingInputVertices).
- */
- private final ConcurrentMap<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>>
- pendingInputEdges = Maps.newConcurrentMap();
- /** Similar to numPendingInputVerticesOnDisk but used for edge buffers */
- private final ConcurrentMap<Integer, Integer> numPendingInputEdgesOnDisk =
- Maps.newConcurrentMap();
- /** Lock to avoid overlap of addition and removal on pendingInputEdges */
- private final ReadWriteLock edgeBufferRWLock = new ReentrantReadWriteLock();
-
- /**
- * For each out-of-core partitions, whether its edge store is also
- * offloaded to disk in INPUT_SUPERSTEP.
- */
- private final ConcurrentMap<Integer, Boolean> hasEdgeStoreOnDisk =
- Maps.newConcurrentMap();
-
- /**
- * Type of VertexIdMessage class (container for serialized messages) received
- * for a particular message. If we write the received messages to disk before
- * adding them to message store, we need this type when we want to read the
- * messages back from disk (so that we know how to read the messages from
- * disk).
- */
- private enum SerializedMessageClass {
- /** ByteArrayVertexIdMessages */
- BYTE_ARRAY_VERTEX_ID_MESSAGES,
- /** ByteArrayOneMEssageToManyIds */
- BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS
- }
-
- /**
- * Similar to vertex buffer and edge buffer, but used for messages (see
- * comments for pendingInputVertices).
- */
- private volatile ConcurrentMap<Integer,
- Pair<Integer, List<VertexIdMessages<I, Writable>>>>
- pendingIncomingMessages = Maps.newConcurrentMap();
- /** Whether a partition has any incoming message buffer on disk */
- private volatile ConcurrentMap<Integer, Boolean> incomingMessagesOnDisk =
- Maps.newConcurrentMap();
-
- /**
- * Similar to pendingIncomingMessages, but is used for messages for current
- * superstep instead.
- */
- private volatile ConcurrentMap<Integer,
- Pair<Integer, List<VertexIdMessages<I, Writable>>>>
- pendingCurrentMessages = Maps.newConcurrentMap();
- /** Similar to incomingMessagesOnDisk for messages for current superstep */
- private volatile ConcurrentMap<Integer, Boolean> currentMessagesOnDisk =
- Maps.newConcurrentMap();
-
- /**
- * Lock to avoid overlap of addition and removal of pending message buffers
- */
- private final ReadWriteLock messageBufferRWLock =
- new ReentrantReadWriteLock();
-
- /**
- * Constructor
- *
- * @param conf Configuration
- * @param context Context
- * @param serviceWorker service worker reference
- */
- public DiskBackedPartitionStore(
- ImmutableClassesGiraphConfiguration<I, V, E> conf,
- Mapper<?, ?, ?, ?>.Context context,
- CentralizedServiceWorker<I, V, E> serviceWorker) {
- super(conf, context, serviceWorker);
- this.minBuffSize = MINIMUM_BUFFER_SIZE_TO_FLUSH.get(conf);
- int userMaxNumPartitions = MAX_PARTITIONS_IN_MEMORY.get(conf);
- if (userMaxNumPartitions > 0) {
- this.isNumPartitionsFixed = true;
- this.maxPartitionsInMem.set(userMaxNumPartitions);
- oocEngine = null;
- } else {
- this.isNumPartitionsFixed = false;
- this.oocEngine =
- new AdaptiveOutOfCoreEngine<I, V, E>(conf, serviceWorker);
- }
- this.movingEdges = false;
- this.isInitialized = new AtomicBoolean(false);
-
- this.processedPartitions = Maps.newHashMap();
- this.processedPartitions
- .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
- this.processedPartitions
- .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
- this.processedPartitions
- .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
-
- this.unProcessedPartitions = Maps.newHashMap();
- this.unProcessedPartitions
- .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
- this.unProcessedPartitions
- .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
- this.unProcessedPartitions
- .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
-
- // Take advantage of multiple disks
- String[] userPaths = PARTITIONS_DIRECTORY.getArray(conf);
- basePaths = new String[userPaths.length];
- int i = 0;
- for (String path : userPaths) {
- basePaths[i++] = path + "/" + conf.get("mapred.job.id", "Unknown Job");
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("DiskBackedPartitionStore with isStaticGraph=" +
- conf.isStaticGraph() + ((userMaxNumPartitions > 0) ?
- (" with maximum " + userMaxNumPartitions + " partitions in memory.") :
- "."));
- }
- }
-
- /**
- * @return maximum number of partitions allowed in memory
- */
- public int getNumPartitionSlots() {
- return maxPartitionsInMem.get();
- }
-
- /**
- * @return number of partitions in memory
- */
- public int getNumPartitionInMemory() {
- return numPartitionsInMem.get();
- }
-
- /**
- * Sets the maximum number of partitions allowed in memory
- *
- * @param numPartitions Number of partitions to allow in memory
- */
- public void setNumPartitionSlots(int numPartitions) {
- maxPartitionsInMem.set(numPartitions);
- }
-
- @Override
- public void initialize() {
- // "initialize" is called right before partition assignment in setup
- // process. However, it might be the case that this worker is a bit slow
- // and other workers start sending vertices/edges (in input superstep)
- // to this worker before the initialize is called. So, we put a guard in
- // necessary places to make sure the 'initialize' is called at a proper time
- // and also only once.
- if (isInitialized.compareAndSet(false, true)) {
- // Set the maximum number of partition slots in memory if unset
- if (maxPartitionsInMem.get() == -1) {
- maxPartitionsInMem.set(serviceWorker.getNumPartitionsOwned());
- // Check if master has not done partition assignment yet (may happen in
- // test codes)
- if (maxPartitionsInMem.get() == 0) {
- LOG.warn("initialize: partitions assigned to this worker is not " +
- "known yet");
- maxPartitionsInMem.set(partitions.size());
- if (maxPartitionsInMem.get() == 0) {
- maxPartitionsInMem.set(Integer.MAX_VALUE);
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("initialize: set the max number of partitions in memory " +
- "to " + maxPartitionsInMem.get());
- }
- oocEngine.initialize();
- }
- }
- }
-
- @Override
- public Iterable<Integer> getPartitionIds() {
- return Iterables.unmodifiableIterable(partitions.keySet());
- }
-
- @Override
- public boolean hasPartition(final Integer id) {
- return partitions.containsKey(id);
- }
-
- @Override
- public int getNumPartitions() {
- return partitions.size();
- }
-
- @Override
- public long getPartitionVertexCount(Integer partitionId) {
- MetaPartition meta = partitions.get(partitionId);
- if (meta == null) {
- return 0;
- } else if (meta.getState() == State.ON_DISK) {
- return meta.getVertexCount();
- } else {
- return meta.getPartition().getVertexCount();
- }
- }
-
- @Override
- public long getPartitionEdgeCount(Integer partitionId) {
- MetaPartition meta = partitions.get(partitionId);
- if (meta == null) {
- return 0;
- } else if (meta.getState() == State.ON_DISK) {
- return meta.getEdgeCount();
- } else {
- return meta.getPartition().getEdgeCount();
- }
- }
-
- /**
- * Spill one partition to disk.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("TLW_TWO_LOCK_WAIT")
- private void swapOnePartitionToDisk() {
- Integer partitionId;
- // The only partitions in memory are IN_TRANSIT, ACTIVE, and INACTIVE ones.
- // If a partition is currently in transit, it means an OOC thread is
- // pushing the partition to disk, or a compute thread is swapping the
- // partition to open up space for another partition. So, such partitions
- // eventually will free up space in memory. However, this method is usually
- // called at critical points where freeing up space in memory is crucial.
- // So, we should look into a partition to swap amongst other in-memory
- // partitions. An in-memory partition that is not in-transit can be in
- // three states:
- // 1) already processed, which we can simply swap it to disk,
- // 2) non-processed but active (means someone is in the middle of
- // processing the partition). In this case we cannot touch the
- // partition until its processing is done.
- // 3) un-processed and inactive. It is bad to swap this partition to disk
- // as someone will load it again for processing in future. But, this
- // method is called to strictly swap a partition to disk. So, if there
- // is no partition in state 1, we should swap a partition in state 3 to
- // disk.
- rwLock.readLock().lock();
- synchronized (processedPartitions) {
- partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
- }
- if (partitionId == null) {
- synchronized (unProcessedPartitions) {
- partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE));
- }
- if (partitionId == null) {
- // At this point some partitions are being processed and we should
- // wait until their processing is done
- synchronized (processedPartitions) {
- partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
- while (partitionId == null) {
- try {
- // Here is the only place we wait on 'processedPartition', and
- // this wait is only for INACTIVE entry of the map. So, only at
- // times where a partition is added to INACTIVE entry of this map,
- // we should call '.notifyAll()'. Although this might seem a bad
- // practice, decoupling the INACTIVE entry from this map makes the
- // synchronization mechanism cumbersome and error-prone.
- processedPartitions.wait();
- } catch (InterruptedException e) {
- throw new IllegalStateException("swapOnePartitionToDisk: Caught" +
- "InterruptedException while waiting on a partition to" +
- "become inactive in memory and swapping it to disk");
- }
- partitionId = popFromSet(processedPartitions.get(State.INACTIVE));
- }
- }
- }
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("swapOnePartitionToDisk: decided to swap partition " +
- partitionId + " to disk");
- }
- MetaPartition swapOutPartition = partitions.get(partitionId);
- checkNotNull(swapOutPartition,
- "swapOnePartitionToDisk: the partition is not found to spill to disk " +
- "(impossible)");
-
- // Since the partition is popped from the maps, it is not going to be
- // processed (or change its process state) until spilling of the partition
- // is done (the only way to access a partition is through
- // processedPartitions or unProcessedPartitions Map, so once a partition is
- // popped from a map, there is no need for synchronization on that
- // partition).
- Map<State, Set<Integer>> ownerMap = (swapOutPartition.isProcessed()) ?
- processedPartitions :
- unProcessedPartitions;
-
- // Here is the *only* place that holds a lock on an in-transit partition.
- // Anywhere else in the code should call wait() on the in-transit partition
- // to release the lock. This is an important optimization as we are no
- // longer have to keep the lock while partition is being transferred to
- // disk.
- synchronized (swapOutPartition) {
- swapOutPartition.setState(State.IN_TRANSIT);
- synchronized (ownerMap) {
- ownerMap.get(State.IN_TRANSIT).add(partitionId);
- }
- }
-
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("swapOnePartitionToDisk: start swapping partition " +
- partitionId + " to disk.");
- }
- offloadPartition(swapOutPartition);
- if (LOG.isInfoEnabled()) {
- LOG.info("swapOnePartitionToDisk: done swapping partition " +
- partitionId + " to disk.");
- }
- } catch (IOException e) {
- throw new IllegalStateException(
- "swapOnePartitionToDisk: Failed while offloading partition " +
- partitionId);
- }
-
- synchronized (swapOutPartition) {
- synchronized (ownerMap) {
- boolean stillInMap = ownerMap.get(State.IN_TRANSIT).remove(partitionId);
- swapOutPartition.setOnDisk();
- numPartitionsInMem.getAndDecrement();
- // If a compute thread gets an IN_TRANSIT partition (as the last resort
- // to get the next partition to process), 'swapOutPartition' may no
- // longer be in its map. But, if it is in its own map, we should update
- // the map.
- if (stillInMap) {
- ownerMap.get(State.ON_DISK).add(partitionId);
- }
- }
- // notifying all threads that are waiting for this partition to spill to
- // disk.
- swapOutPartition.notifyAll();
- }
- rwLock.readLock().unlock();
- }
-
- /**
- * Decrement maximum number of partitions allowed in memory by one and pushes
- * one partition to disk if necessary.
- */
- public void spillOnePartition() {
- if (maxPartitionsInMem.getAndDecrement() <= numPartitionsInMem.get()) {
- swapOnePartitionToDisk();
- }
- }
-
- @Override
- public Partition<I, V, E> getNextPartition() {
- Integer partitionId;
- // We prioritize accesses to currently in-memory partitions first. If there
- // is no such partition, we choose amongst on-disk partitions. This is a
- // preferable choice over in-transit partitions since we can start bringing
- // on-disk partitions to memory right away, while if we choose in-transit
- // partitions, we first have to wait for the transit to be complete, and
- // then bring the partition back in memory again.
- synchronized (unProcessedPartitions) {
- partitionId = popFromSet(unProcessedPartitions.get(State.INACTIVE));
- if (partitionId == null) {
- partitionId = popFromSet(unProcessedPartitions.get(State.ON_DISK));
- if (partitionId == null) {
- partitionId = popFromSet(unProcessedPartitions.get(State.IN_TRANSIT));
- }
- }
- }
-
- // Check if we are at the end of the current iteration cycle
- if (partitionId == null) {
- return null;
- }
-
- MetaPartition meta = partitions.get(partitionId);
- checkNotNull(meta, "getNextPartition: partition " + partitionId +
- " does not exist (impossible)");
-
- // The only time we iterate through all partitions in INPUT_SUPERSTEP is
- // when we want to move
- // edges from edge store to vertices. So, we check if we have anything in
- // edge store for the chosen partition, and if there is no edge store for
- // this partition, we skip processing it. This avoids unnecessary loading
- // of on-disk partitions that does not have edge store.
- if (movingEdges) {
- boolean shouldProcess = false;
- synchronized (meta) {
- if (meta.getState() == State.INACTIVE) {
- shouldProcess = edgeStore.hasPartitionEdges(partitionId);
- } else { // either ON_DISK or IN_TRANSIT
- Integer numBuf = numPendingInputEdgesOnDisk.get(partitionId);
- Boolean hasStore = hasEdgeStoreOnDisk.get(partitionId);
- shouldProcess =
- (numBuf != null && numBuf != 0) || (hasStore != null && hasStore);
- }
- if (!shouldProcess) {
- meta.setProcessed(true);
- synchronized (processedPartitions) {
- processedPartitions.get(meta.getState()).add(partitionId);
- if (meta.getState() == State.INACTIVE) {
- processedPartitions.notifyAll();
- }
- }
- }
- }
- if (!shouldProcess) {
- return getNextPartition();
- }
- }
- getPartition(meta);
- return meta.getPartition();
- }
-
- /**
- * Method that gets a partition from the store.
- * The partition is produced as a side effect of the computation and is
- * reflected inside the META object provided as parameter.
- * This function is thread-safe since it locks the whole computation
- * on the MetaPartition provided.
- *
- * When a thread tries to access an element on disk, it waits until space
- * becomes available in memory by swapping partitions to disk.
- *
- * @param meta meta partition container with the partition itself
- */
- private void getPartition(MetaPartition meta) {
- int partitionId = meta.getId();
- synchronized (meta) {
- boolean partitionInMemory = false;
- while (!partitionInMemory) {
- switch (meta.getState()) {
- case INACTIVE:
- // Check if the message store for the current superstep is in memory,
- // and if not, load it from the disk.
- Boolean messagesOnDisk = currentMessagesOnDisk.get(partitionId);
- if (messagesOnDisk != null && messagesOnDisk) {
- try {
- loadMessages(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("getPartition: failed while " +
- "loading messages of current superstep for partition " +
- partitionId);
- }
- }
- meta.setState(State.ACTIVE);
- partitionInMemory = true;
- break;
- case IN_TRANSIT:
- try {
- // Wait until the partition transfer to disk is complete
- meta.wait();
- } catch (InterruptedException e) {
- throw new IllegalStateException("getPartition: exception " +
- "while waiting on IN_TRANSIT partition " + partitionId + " to" +
- " fully spill to disk.");
- }
- break;
- case ON_DISK:
- boolean spaceAvailable = false;
-
- while (numPartitionsInMem.get() >= maxPartitionsInMem.get()) {
- swapOnePartitionToDisk();
- }
-
- // Reserve the space in memory for the partition
- if (numPartitionsInMem.incrementAndGet() <=
- maxPartitionsInMem.get()) {
- spaceAvailable = true;
- } else {
- numPartitionsInMem.decrementAndGet();
- }
-
- if (spaceAvailable) {
- Partition<I, V, E> partition;
- try {
- if (LOG.isInfoEnabled()) {
- LOG.info("getPartition: start reading partition " +
- partitionId + " from disk");
- }
- partition = loadPartition(meta);
- if (LOG.isInfoEnabled()) {
- LOG.info("getPartition: done reading partition " +
- partitionId + " from disk");
- }
- } catch (IOException e) {
- LOG.error("getPartition: Failed while Loading Partition " +
- "from disk: " + e.getMessage());
- throw new IllegalStateException(e);
- }
- meta.setActive(partition);
- partitionInMemory = true;
- }
- break;
- default:
- throw new IllegalStateException("illegal state " + meta.getState() +
- " for partition " + meta.getId());
- }
- }
- }
- }
-
- @Override
- public void prepareSuperstep() {
- rwLock.writeLock().lock();
- super.prepareSuperstep();
- pendingCurrentMessages = pendingIncomingMessages;
- currentMessagesOnDisk = incomingMessagesOnDisk;
- pendingIncomingMessages = Maps.newConcurrentMap();
- incomingMessagesOnDisk = Maps.newConcurrentMap();
- rwLock.writeLock().unlock();
- }
-
- /**
- * Spill message buffers (either buffers for messages for current superstep,
- * or buffers for incoming messages) of a given partition to disk. Note that
- * the partition should be ON_DISK or IN_TRANSIT.
- *
- * @param partitionId Id of the partition to spill its message buffers
- * @throws IOException
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(
- "UL_UNRELEASED_LOCK_EXCEPTION_PATH")
- public void spillPartitionMessages(Integer partitionId) throws IOException {
- rwLock.readLock().lock();
- spillMessages(partitionId, pendingCurrentMessages,
- serviceWorker.getSuperstep());
- spillMessages(partitionId, pendingIncomingMessages,
- serviceWorker.getSuperstep() + 1);
- rwLock.readLock().unlock();
- }
-
- /**
- * Spill message buffers of a particular type of message (current or incoming
- * buffer) for a partition to disk.
- *
- * @param partitionId Id of the partition to spill the messages for
- * @param pendingMessages The map to get the message buffers from
- * @param superstep Superstep of which we want to offload messages. This is
- * equal to current superstep number if we want to offload
- * buffers for currentMessageStore, and is equal to next
- * superstep number if we want to offload buffer for
- * incomingMessageStore
- * @throws IOException
- */
- private void spillMessages(Integer partitionId,
- ConcurrentMap<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
- pendingMessages, long superstep) throws IOException {
- Pair<Integer, List<VertexIdMessages<I, Writable>>> entry;
- messageBufferRWLock.writeLock().lock();
- entry = pendingMessages.remove(partitionId);
- if (entry != null && entry.getLeft() < minBuffSize) {
- pendingMessages.put(partitionId, entry);
- entry = null;
- }
- messageBufferRWLock.writeLock().unlock();
-
- if (entry == null) {
- return;
- }
-
- // Sanity check
- checkState(!entry.getRight().isEmpty(),
- "spillMessages: the message buffer that is supposed to be flushed to " +
- "disk does not exist.");
-
- File file = new File(getPendingMessagesBufferPath(partitionId, superstep));
-
- FileOutputStream fos = new FileOutputStream(file, true);
- BufferedOutputStream bos = new BufferedOutputStream(fos);
- DataOutputStream dos = new DataOutputStream(bos);
- for (VertexIdMessages<I, Writable> messages : entry.getRight()) {
- SerializedMessageClass messageClass;
- if (messages instanceof ByteArrayVertexIdMessages) {
- messageClass = SerializedMessageClass.BYTE_ARRAY_VERTEX_ID_MESSAGES;
- } else if (messages instanceof ByteArrayOneMessageToManyIds) {
- messageClass =
- SerializedMessageClass.BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS;
- } else {
- throw new IllegalStateException("spillMessages: serialized message " +
- "type is not supported");
- }
- dos.writeInt(messageClass.ordinal());
- messages.write(dos);
- }
- dos.close();
- }
-
- /**
- * Looks through all partitions already on disk, and see if any of them has
- * enough pending message in its buffer in memory. This can be message buffer
- * of current superstep, or incoming superstep. If so, put that partition
- * along with an approximate amount of memory it took (in bytes) in a list to
- * return.
-
- * @return List of pairs (partitionId, sizeInByte) of the partitions where
- * their pending messages are worth flushing to disk
- */
- public PairList<Integer, Integer> getOocPartitionIdsWithPendingMessages() {
- PairList<Integer, Integer> pairList = new PairList<>();
- pairList.initialize();
- Set<Integer> partitionIds = Sets.newHashSet();
- // First, iterating over pending incoming messages
- if (pendingIncomingMessages != null) {
- for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
- entry : pendingIncomingMessages.entrySet()) {
- if (entry.getValue().getLeft() > minBuffSize) {
- pairList.add(entry.getKey(), entry.getValue().getLeft());
- partitionIds.add(entry.getKey());
- }
- }
- }
- // Second, iterating over pending current messages (i.e. pending incoming
- // messages of previous superstep)
- if (pendingCurrentMessages != null) {
- for (Entry<Integer, Pair<Integer, List<VertexIdMessages<I, Writable>>>>
- entry : pendingCurrentMessages.entrySet()) {
- if (entry.getValue().getLeft() > minBuffSize &&
- !partitionIds.contains(entry.getKey())) {
- pairList.add(entry.getKey(), entry.getValue().getLeft());
- }
- }
- }
- return pairList;
- }
-
- @Override
- public <M extends Writable> void addPartitionCurrentMessages(
- int partitionId, VertexIdMessages<I, M> messages) throws IOException {
- // Current messages are only added to the store in the event of partition
- // migration. Presumably the partition has just migrated and its data is
- // still available in memory. Note that partition migration only happens at
- // the beginning of a superstep.
- ((MessageStore<I, M>) currentMessageStore)
- .addPartitionMessages(partitionId, messages);
- }
-
- @Override
- public <M extends Writable> void addPartitionIncomingMessages(
- int partitionId, VertexIdMessages<I, M> messages) throws IOException {
- if (conf.getIncomingMessageClasses().useMessageCombiner()) {
- ((MessageStore<I, M>) incomingMessageStore)
- .addPartitionMessages(partitionId, messages);
- } else {
- MetaPartition meta = partitions.get(partitionId);
- checkNotNull(meta, "addPartitionIncomingMessages: trying to add " +
- "messages to partition " + partitionId + " which does not exist " +
- "in the partition set of this worker!");
-
- synchronized (meta) {
- switch (meta.getState()) {
- case INACTIVE:
- case ACTIVE:
- // A partition might be in memory, but its message store might still
- // be on disk. This happens because while we are loading the partition
- // to memory, we only load its current messages, not the incoming
- // messages. If a new superstep has been started, while the partition
- // is still in memory, the incoming message store in the previous
- // superstep (which is the current messages in the current superstep)
- // is on disk.
- // This may also happen when a partition is offloaded to disk while
- // it was unprocessed, and then again loaded in the same superstep for
- // processing.
- Boolean isMsgOnDisk = incomingMessagesOnDisk.get(partitionId);
- if (isMsgOnDisk == null || !isMsgOnDisk) {
- ((MessageStore<I, M>) incomingMessageStore)
- .addPartitionMessages(partitionId, messages);
- break;
- }
- // Continue to IN_TRANSIT and ON_DISK cases as the partition is in
- // memory, but it's messages are not yet loaded
- // CHECKSTYLE: stop FallThrough
- case IN_TRANSIT:
- case ON_DISK:
- // CHECKSTYLE: resume FallThrough
- List<VertexIdMessages<I, Writable>> newMessages =
- new ArrayList<VertexIdMessages<I, Writable>>();
- newMessages.add((VertexIdMessages<I, Writable>) messages);
- int length = messages.getSerializedSize();
- Pair<Integer, List<VertexIdMessages<I, Writable>>> newPair =
- new MutablePair<>(length, newMessages);
- messageBufferRWLock.readLock().lock();
- Pair<Integer, List<VertexIdMessages<I, Writable>>> oldPair =
- pendingIncomingMessages.putIfAbsent(partitionId, newPair);
- if (oldPair != null) {
- synchronized (oldPair) {
- MutablePair<Integer, List<VertexIdMessages<I, Writable>>> pair =
- (MutablePair<Integer, List<VertexIdMessages<I, Writable>>>)
- oldPair;
- pair.setLeft(pair.getLeft() + length);
- pair.getRight().add((VertexIdMessages<I, Writable>) messages);
- }
- }
- messageBufferRWLock.readLock().unlock();
- // In the case that the number of partitions is asked to be fixed by
- // the user, we should offload the message buffers as necessary.
- if (isNumPartitionsFixed &&
- pendingIncomingMessages.get(partitionId).getLeft() >
- minBuffSize) {
- try {
- spillPartitionMessages(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("addPartitionIncomingMessages: " +
- "spilling message buffers for partition " + partitionId +
- " failed!");
- }
- }
- break;
- default:
- throw new IllegalStateException("addPartitionIncomingMessages: " +
- "illegal state " + meta.getState() + " for partition " +
- meta.getId());
- }
- }
- }
- }
-
- /**
- * Spills edge store generated for specified partition in INPUT_SUPERSTEP
- * Note that the partition should be ON_DISK or IN_TRANSIT.
- *
- * @param partitionId Id of partition to spill its edge buffer
- */
- public void spillPartitionInputEdgeStore(Integer partitionId)
- throws IOException {
- rwLock.readLock().lock();
- if (movingEdges) {
- rwLock.readLock().unlock();
- return;
- }
- Pair<Integer, List<VertexIdEdges<I, E>>> entry;
-
- // Look at the comment for the similar logic in
- // 'spillPartitionInputVertexBuffer' for why this lock is necessary.
- edgeBufferRWLock.writeLock().lock();
- entry = pendingInputEdges.remove(partitionId);
- edgeBufferRWLock.writeLock().unlock();
-
- // Check if the intermediate edge store has already been moved to vertices
- if (entry == null) {
- rwLock.readLock().unlock();
- return;
- }
-
- // Sanity check
- checkState(!entry.getRight().isEmpty(),
- "spillPartitionInputEdgeStore: the edge buffer that is supposed to " +
- "be flushed to disk does not exist.");
-
- List<VertexIdEdges<I, E>> bufferList = entry.getRight();
- Integer numBuffers = numPendingInputEdgesOnDisk.putIfAbsent(partitionId,
- bufferList.size());
- if (numBuffers != null) {
- numPendingInputEdgesOnDisk.replace(partitionId,
- numBuffers + bufferList.size());
- }
-
- File file = new File(getPendingEdgesBufferPath(partitionId));
- FileOutputStream fos = new FileOutputStream(file, true);
- BufferedOutputStream bos = new BufferedOutputStream(fos);
- DataOutputStream dos = new DataOutputStream(bos);
- for (VertexIdEdges<I, E> edges : entry.getRight()) {
- edges.write(dos);
- }
- dos.close();
- rwLock.readLock().unlock();
- }
-
- /**
- * Looks through all partitions already on disk, and see if any of them has
- * enough pending edges in its buffer in memory. If so, put that
- * partition along with an approximate amount of memory it took (in bytes) in
- * a list to return.
-
- * @return List of pairs (partitionId, sizeInByte) of the partitions where
- * their pending edge store in input superstep in worth flushing to
- * disk
- */
- public PairList<Integer, Integer> getOocPartitionIdsWithPendingInputEdges() {
- PairList<Integer, Integer> pairList = new PairList<>();
- pairList.initialize();
- if (!movingEdges) {
- for (Entry<Integer, Pair<Integer, List<VertexIdEdges<I, E>>>> entry :
- pendingInputEdges.entrySet()) {
- if (entry.getValue().getLeft() > minBuffSize) {
- pairList.add(entry.getKey(), entry.getValue().getLeft());
- }
- }
- }
- return pairList;
- }
-
- @Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH")
- public void addPartitionEdges(Integer partitionId,
- VertexIdEdges<I, E> edges) {
- if (!isInitialized.get()) {
- initialize();
- }
-
- MetaPartition meta = new MetaPartition(partitionId);
- MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
- if (temp != null) {
- meta = temp;
- }
-
- boolean createPartition = false;
- synchronized (meta) {
- switch (meta.getState()) {
- case INIT:
- Partition<I, V, E> partition =
- conf.createPartition(partitionId, context);
- meta.setPartition(partition);
- // This is set to processed so that in the very next iteration cycle,
- // when startIteration is called, all partitions seem to be processed
- // and ready for the next iteration cycle. Otherwise, startIteration
- // fails in its sanity check due to finding an unprocessed partition.
- meta.setProcessed(true);
- numPartitionsInMem.getAndIncrement();
- meta.setState(State.INACTIVE);
- synchronized (processedPartitions) {
- processedPartitions.get(State.INACTIVE).add(partitionId);
- processedPartitions.notifyAll();
- }
- createPartition = true;
- // Continue to INACTIVE case to add the edges to the partition
- // CHECKSTYLE: stop FallThrough
- case INACTIVE:
- // CHECKSTYLE: resume FallThrough
- edgeStore.addPartitionEdges(partitionId, edges);
- break;
- case IN_TRANSIT:
- case ON_DISK:
- // Adding edges to in-memory buffer of the partition
- List<VertexIdEdges<I, E>> newEdges =
- new ArrayList<VertexIdEdges<I, E>>();
- newEdges.add(edges);
- int length = edges.getSerializedSize();
- Pair<Integer, List<VertexIdEdges<I, E>>> newPair =
- new MutablePair<>(length, newEdges);
- edgeBufferRWLock.readLock().lock();
- Pair<Integer, List<VertexIdEdges<I, E>>> oldPair =
- pendingInputEdges.putIfAbsent(partitionId, newPair);
- if (oldPair != null) {
- synchronized (oldPair) {
- MutablePair<Integer, List<VertexIdEdges<I, E>>> pair =
- (MutablePair<Integer, List<VertexIdEdges<I, E>>>) oldPair;
- pair.setLeft(pair.getLeft() + length);
- pair.getRight().add(edges);
- }
- }
- edgeBufferRWLock.readLock().unlock();
- // In the case that the number of partitions is asked to be fixed by the
- // user, we should offload the edge store as necessary.
- if (isNumPartitionsFixed &&
- pendingInputEdges.get(partitionId).getLeft() > minBuffSize) {
- try {
- spillPartitionInputEdgeStore(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("addPartitionEdges: spilling " +
- "edge store for partition " + partitionId + " failed!");
- }
- }
- break;
- default:
- throw new IllegalStateException("illegal state " + meta.getState() +
- " for partition " + meta.getId());
- }
- }
- // If creation of a new partition is violating the policy of maximum number
- // of partitions in memory, we should spill a partition to disk.
- if (createPartition &&
- numPartitionsInMem.get() > maxPartitionsInMem.get()) {
- swapOnePartitionToDisk();
- }
- }
-
- /**
- * Spills vertex buffer generated for specified partition in INPUT_SUPERSTEP
- * Note that the partition should be ON_DISK or IN_TRANSIT.
- *
- * @param partitionId Id of partition to spill its vertex buffer
- */
- public void spillPartitionInputVertexBuffer(Integer partitionId)
- throws IOException {
- rwLock.readLock().lock();
- if (movingEdges) {
- rwLock.readLock().unlock();
- return;
- }
- Pair<Integer, List<ExtendedDataOutput>> entry;
- // Synchronization on the concurrent map is necessary to avoid inconsistent
- // structure while execution of this method is interleaved with the
- // execution of addPartitionVertices. For instance, consider
- // the case where a thread wants to modify the value of an entry in
- // addPartitionVertices while another thread is running this
- // method removing the entry from the map. If removal and offloading the
- // entry's value to disk happens first, the modification by former thread
- // would be lost.
- vertexBufferRWLock.writeLock().lock();
- entry = pendingInputVertices.remove(partitionId);
- vertexBufferRWLock.writeLock().unlock();
-
- // Check if vertex buffer has already been merged with the partition
- if (entry == null) {
- rwLock.readLock().unlock();
- return;
- }
- // Sanity check
- checkState(!entry.getRight().isEmpty(),
- "spillPartitionInputVertexBuffer: the vertex buffer that is " +
- "supposed to be flushed to disk does not exist.");
-
- List<ExtendedDataOutput> bufferList = entry.getRight();
- Integer numBuffers = numPendingInputVerticesOnDisk.putIfAbsent(partitionId,
- bufferList.size());
- if (numBuffers != null) {
- numPendingInputVerticesOnDisk.replace(partitionId,
- numBuffers + bufferList.size());
- }
-
- File file = new File(getPendingVerticesBufferPath(partitionId));
- FileOutputStream fos = new FileOutputStream(file, true);
- BufferedOutputStream bos = new BufferedOutputStream(fos);
- DataOutputStream dos = new DataOutputStream(bos);
- for (ExtendedDataOutput extendedDataOutput : bufferList) {
- WritableUtils.writeExtendedDataOutput(extendedDataOutput, dos);
- }
- dos.close();
- rwLock.readLock().unlock();
- }
-
- /**
- * Looks through all partitions already on disk, and see if any of them has
- * enough pending vertices in its buffer in memory. If so, put that
- * partition along with an approximate amount of memory it took (in bytes) in
- * a list to return.
- *
- * @return List of pairs (partitionId, sizeInByte) of the partitions where
- * their pending vertex buffer in input superstep is worth flushing to
- * disk
- */
- public PairList<Integer, Integer>
- getOocPartitionIdsWithPendingInputVertices() {
- PairList<Integer, Integer> pairList = new PairList<>();
- pairList.initialize();
- if (!movingEdges) {
- for (Entry<Integer, Pair<Integer, List<ExtendedDataOutput>>> entry :
- pendingInputVertices.entrySet()) {
- if (entry.getValue().getLeft() > minBuffSize) {
- pairList.add(entry.getKey(), entry.getValue().getLeft());
- }
- }
- }
- return pairList;
- }
-
- @Override
- @edu.umd.cs.findbugs.annotations.SuppressWarnings("SF_SWITCH_FALLTHROUGH")
- public void addPartitionVertices(Integer partitionId,
- ExtendedDataOutput extendedDataOutput) {
- if (!isInitialized.get()) {
- initialize();
- }
-
- MetaPartition meta = new MetaPartition(partitionId);
- MetaPartition temp = partitions.putIfAbsent(partitionId, meta);
- if (temp != null) {
- meta = temp;
- }
-
- boolean createPartition = false;
- synchronized (meta) {
- switch (meta.getState()) {
- case INIT:
- Partition<I, V, E> partition =
- conf.createPartition(partitionId, context);
- meta.setPartition(partition);
- // Look at the comments in 'addPartitionVertices' for why we set the
- // this to true.
- meta.setProcessed(true);
- numPartitionsInMem.getAndIncrement();
- meta.setState(State.INACTIVE);
- synchronized (processedPartitions) {
- processedPartitions.get(State.INACTIVE).add(partitionId);
- processedPartitions.notifyAll();
- }
- createPartition = true;
- // Continue to INACTIVE case to add the vertices to the partition
- // CHECKSTYLE: stop FallThrough
- case INACTIVE:
- // CHECKSTYLE: resume FallThrough
- meta.getPartition().addPartitionVertices(
- new VertexIterator<I, V, E>(extendedDataOutput, conf));
- break;
- case IN_TRANSIT:
- case ON_DISK:
- // Adding vertices to in-memory buffer of the partition
- List<ExtendedDataOutput> vertices = new ArrayList<ExtendedDataOutput>();
- vertices.add(extendedDataOutput);
- int length = extendedDataOutput.getPos();
- Pair<Integer, List<ExtendedDataOutput>> newPair =
- new MutablePair<>(length, vertices);
- vertexBufferRWLock.readLock().lock();
- Pair<Integer, List<ExtendedDataOutput>> oldPair =
- pendingInputVertices.putIfAbsent(partitionId, newPair);
- if (oldPair != null) {
- synchronized (oldPair) {
- MutablePair<Integer, List<ExtendedDataOutput>> pair =
- (MutablePair<Integer, List<ExtendedDataOutput>>) oldPair;
- pair.setLeft(pair.getLeft() + length);
- pair.getRight().add(extendedDataOutput);
- }
- }
- vertexBufferRWLock.readLock().unlock();
- // In the case that the number of partitions is asked to be fixed by the
- // user, we should offload the edge store as necessary.
- if (isNumPartitionsFixed &&
- pendingInputVertices.get(partitionId).getLeft() > minBuffSize) {
- try {
- spillPartitionInputVertexBuffer(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("addPartitionVertices: spilling " +
- "vertex buffer for partition " + partitionId + " failed!");
- }
- }
- break;
- default:
- throw new IllegalStateException("illegal state " + meta.getState() +
- " for partition " + meta.getId());
- }
- }
- // If creation of a new partition is violating the policy of maximum number
- // of partitions in memory, we should spill a partition to disk.
- if (createPartition &&
- numPartitionsInMem.get() > maxPartitionsInMem.get()) {
- swapOnePartitionToDisk();
- }
- }
-
- @Override
- public void putPartition(Partition<I, V, E> partition) {
- checkArgument(partition != null);
-
- Integer id = partition.getId();
- MetaPartition meta = partitions.get(id);
- checkNotNull(meta, "putPartition: partition to put does " +
- "not exist in the store (impossible)");
- synchronized (meta) {
- checkState(meta.getState() == State.ACTIVE,
- "It is not possible to put back a partition which is not ACTIVE. " +
- "meta = " + meta.toString());
-
- meta.setState(State.INACTIVE);
- meta.setProcessed(true);
- synchronized (processedPartitions) {
- processedPartitions.get(State.INACTIVE).add(id);
- // Notify OOC threads waiting for a partition to become available to put
- // on disk.
- processedPartitions.notifyAll();
- }
- }
- }
-
- @Override
- public Partition<I, V, E> removePartition(Integer partitionId) {
- if (hasPartition(partitionId)) {
- MetaPartition meta = partitions.remove(partitionId);
- // Since this method is called outside of the iteration cycle, all
- // partitions in the store should be in the processed state.
- checkState(processedPartitions.get(meta.getState()).remove(partitionId),
- "removePartition: partition that is about to remove is not in " +
- "processed list (impossible)");
-
- getPartition(meta);
- numPartitionsInMem.getAndDecrement();
- return meta.getPartition();
- }
- return null;
- }
-
- @Override
- public boolean addPartition(Partition<I, V, E> partition) {
- if (!isInitialized.get()) {
- initialize();
- }
-
- Integer id = partition.getId();
- MetaPartition meta = new MetaPartition(id);
- MetaPartition temp = partitions.putIfAbsent(id, meta);
- if (temp != null) {
- return false;
- }
-
- if (LOG.isInfoEnabled()) {
- LOG.info("addPartition: partition " + id + " is added to the store.");
- }
-
- meta.setPartition(partition);
- meta.setState(State.INACTIVE);
- meta.setProcessed(true);
- synchronized (processedPartitions) {
- processedPartitions.get(State.INACTIVE).add(id);
- processedPartitions.notifyAll();
- }
- numPartitionsInMem.getAndIncrement();
- // Swapping partitions to disk if addition of this partition violates the
- // requirement on the number of partitions.
- if (numPartitionsInMem.get() > maxPartitionsInMem.get()) {
- swapOnePartitionToDisk();
- }
- return true;
- }
-
- @Override
- public void shutdown() {
- // Sanity check to check there is nothing left from previous superstep
- checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
- unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
- unProcessedPartitions.get(State.ON_DISK).isEmpty(),
- "shutdown: There are some unprocessed partitions left from the " +
- "previous superstep. This should not be possible.");
-
- for (MetaPartition meta : partitions.values()) {
- synchronized (meta) {
- while (meta.getState() == State.IN_TRANSIT) {
- try {
- meta.wait();
- } catch (InterruptedException e) {
- throw new IllegalStateException("shutdown: exception while" +
- "waiting on an IN_TRANSIT partition to be written on disk");
- }
- }
- if (meta.getState() == State.ON_DISK) {
- deletePartitionFiles(meta.getId());
- }
- }
- }
-
- if (oocEngine != null) {
- oocEngine.shutdown();
- }
- }
-
- @Override
- public void startIteration() {
- if (!isInitialized.get()) {
- initialize();
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("startIteration: with " + numPartitionsInMem.get() +
- " partitions in memory, there can be maximum " + maxPartitionsInMem +
- " partitions in memory out of " + partitions.size() + " that " +
- "belongs to this worker.");
- }
- // Sanity check to make sure nothing left unprocessed from previous
- // superstep
- checkState(unProcessedPartitions.get(State.INACTIVE).isEmpty() &&
- unProcessedPartitions.get(State.IN_TRANSIT).isEmpty() &&
- unProcessedPartitions.get(State.ON_DISK).isEmpty(),
- "startIteration: There are some unprocessed and/or " +
- "in-transition partitions left from the previous superstep. " +
- "This should not be possible.");
-
- rwLock.writeLock().lock();
- for (MetaPartition meta : partitions.values()) {
- // Sanity check
- checkState(meta.isProcessed(), "startIteration: " +
- "meta-partition " + meta + " has not been processed in the " +
- "previous superstep.");
-
- // The only case where a partition can be IN_TRANSIT is where it is still
- // being offloaded to disk, and it happens only in swapOnePartitionToDisk,
- // where we at least hold a read lock while transfer is in progress. Since
- // the write lock is held in this section, no partition should be
- // IN_TRANSIT.
- checkState(meta.getState() != State.IN_TRANSIT,
- "startIteration: meta-partition " + meta + " is still IN_TRANSIT " +
- "(impossible)");
-
- meta.setProcessed(false);
- }
-
- unProcessedPartitions.clear();
- unProcessedPartitions.putAll(processedPartitions);
- processedPartitions.clear();
- processedPartitions
- .put(State.INACTIVE, Sets.<Integer>newLinkedHashSet());
- processedPartitions
- .put(State.IN_TRANSIT, Sets.<Integer>newLinkedHashSet());
- processedPartitions
- .put(State.ON_DISK, Sets.<Integer>newLinkedHashSet());
- rwLock.writeLock().unlock();
- LOG.info("startIteration: done preparing the iteration");
- }
-
- @Override
- public void moveEdgesToVertices() {
- movingEdges = true;
- edgeStore.moveEdgesToVertices();
- movingEdges = false;
- }
-
- /**
- * Pops an entry from the specified set. This is guaranteed that the set is
- * being accessed from within a lock.
- *
- * @param set set to pop an entry from
- * @return popped entry from the given set
- */
- private Integer popFromSet(Set<Integer> set) {
- if (!set.isEmpty()) {
- Iterator<Integer> it = set.iterator();
- Integer id = it.next();
- it.remove();
- return id;
- }
- return null;
- }
-
- /**
- * Allows more partitions to be stored in memory.
- *
- * @param numPartitionsToIncrease How many more partitions to allow in the
- * store
- */
- public void increasePartitionSlots(Integer numPartitionsToIncrease) {
- maxPartitionsInMem.getAndAdd(numPartitionsToIncrease);
- if (LOG.isInfoEnabled()) {
- LOG.info("increasePartitionSlots: allowing partition store to have " +
- numPartitionsToIncrease + " more partitions. Now, partition store " +
- "can have up to " + maxPartitionsInMem.get() + " partitions.");
- }
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (MetaPartition e : partitions.values()) {
- sb.append(e.toString() + "\n");
- }
- return sb.toString();
- }
-
- /**
- * Writes vertex data (Id, value and halted state) to stream.
- *
- * @param output The output stream
- * @param vertex The vertex to serialize
- * @throws IOException
- */
- private void writeVertexData(DataOutput output, Vertex<I, V, E> vertex)
- throws IOException {
-
- vertex.getId().write(output);
- vertex.getValue().write(output);
- output.writeBoolean(vertex.isHalted());
- }
-
- /**
- * Writes vertex edges (Id, edges) to stream.
- *
- * @param output The output stream
- * @param vertex The vertex to serialize
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- private void writeOutEdges(DataOutput output, Vertex<I, V, E> vertex)
- throws IOException {
-
- vertex.getId().write(output);
- OutEdges<I, E> edges = (OutEdges<I, E>) vertex.getEdges();
- edges.write(output);
- }
-
- /**
- * Read vertex data from an input and initialize the vertex.
- *
- * @param in The input stream
- * @param vertex The vertex to initialize
- * @throws IOException
- */
- private void readVertexData(DataInput in, Vertex<I, V, E> vertex)
- throws IOException {
-
- I id = conf.createVertexId();
- id.readFields(in);
- V value = conf.createVertexValue();
- value.readFields(in);
- OutEdges<I, E> edges = conf.createAndInitializeOutEdges(0);
- vertex.initialize(id, value, edges);
- if (in.readBoolean()) {
- vertex.voteToHalt();
- } else {
- vertex.wakeUp();
- }
- }
-
- /**
- * Read vertex edges from an input and set them to the vertex.
- *
- * @param in The input stream
- * @param partition The partition owning the vertex
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- private void readOutEdges(DataInput in, Partition<I, V, E> partition)
- throws IOException {
-
- I id = conf.createVertexId();
- id.readFields(in);
- Vertex<I, V, E> v = partition.getVertex(id);
- OutEdges<I, E> edges = (OutEdges<I, E>) v.getEdges();
- edges.readFields(in);
- partition.saveVertex(v);
- }
-
- /**
- * Load messages for a given partition for the current superstep to memory.
- *
- * @param partitionId Id of the partition to load the messages for
- * @throws IOException
- */
- private void loadMessages(int partitionId) throws IOException {
- // Messages for current superstep
- if (currentMessageStore != null &&
- !conf.getOutgoingMessageClasses().useMessageCombiner()) {
- checkState(!currentMessageStore.hasMessagesForPartition(partitionId),
- "loadMessages: partition " + partitionId + " is on disk, " +
- "but its message store is in memory (impossible)");
- // First, reading the message store for the partition if there is any
- File file = new File(
- getMessagesPath(partitionId, serviceWorker.getSuperstep()));
- if (file.exists()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadMessages: loading message store of partition " +
- partitionId);
- }
- FileInputStream filein = new FileInputStream(file);
- BufferedInputStream bufferin = new BufferedInputStream(filein);
- DataInputStream inputStream = new DataInputStream(bufferin);
- currentMessageStore.readFieldsForPartition(inputStream, partitionId);
- inputStream.close();
- checkState(file.delete(), "loadMessages: failed to delete %s.",
- file.getAbsolutePath());
- }
-
- messageBufferRWLock.writeLock().lock();
- Pair<Integer, List<VertexIdMessages<I, Writable>>> pendingMessages =
- pendingCurrentMessages.remove(partitionId);
- messageBufferRWLock.writeLock().unlock();
-
- // Second, reading message buffers (incoming messages in previous
- // superstep)
- file = new File(getPendingMessagesBufferPath(partitionId,
- serviceWorker.getSuperstep()));
- if (file.exists()) {
- FileInputStream filein = new FileInputStream(file);
- BufferedInputStream bufferin = new BufferedInputStream(filein);
- DataInputStream inputStream = new DataInputStream(bufferin);
- while (true) {
- int type;
- try {
- type = inputStream.readInt();
- } catch (EOFException e) {
- // Reached end of file, so all the records are read.
- break;
- }
- SerializedMessageClass messageClass =
- SerializedMessageClass.values()[type];
- VertexIdMessages<I, Writable> vim;
- switch (messageClass) {
- case BYTE_ARRAY_VERTEX_ID_MESSAGES:
- vim = new ByteArrayVertexIdMessages<>(
- conf.createOutgoingMessageValueFactory());
- vim.setConf(conf);
- break;
- case BYTE_ARRAY_ONE_MESSAGE_TO_MANY_IDS:
- vim = new ByteArrayOneMessageToManyIds<>(
- conf.createOutgoingMessageValueFactory());
- vim.setConf(conf);
- break;
- default:
- throw new IllegalStateException("loadMessages: unsupported " +
- "serialized message type!");
- }
- vim.readFields(inputStream);
- currentMessageStore.addPartitionMessages(partitionId, vim);
- }
- inputStream.close();
- checkState(!file.delete(), "loadMessages: failed to delete %s",
- file.getAbsolutePath());
- }
-
- // Third, applying message buffers already in memory
- if (pendingMessages != null) {
- for (VertexIdMessages<I, Writable> vim : pendingMessages.getRight()) {
- currentMessageStore.addPartitionMessages(partitionId, vim);
- }
- }
- currentMessagesOnDisk.put(partitionId, false);
- }
- }
-
- /**
- * Load a partition from disk. It deletes the files after the load,
- * except for the edges, if the graph is static.
- *
- * @param meta meta partition to load the partition of
- * @return The partition
- * @throws IOException
- */
- @SuppressWarnings("unchecked")
- private Partition<I, V, E> loadPartition(MetaPartition meta)
- throws IOException {
- Integer partitionId = meta.getId();
- long numVertices = meta.getVertexCount();
- Partition<I, V, E> partition = conf.createPartition(partitionId, context);
-
- // Vertices
- File file = new File(getVerticesPath(partitionId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartition: loading partition vertices " +
- partition.getId() + " from " + file.getAbsolutePath());
- }
-
- FileInputStream filein = new FileInputStream(file);
- BufferedInputStream bufferin = new BufferedInputStream(filein);
- DataInputStream inputStream = new DataInputStream(bufferin);
- for (int i = 0; i < numVertices; ++i) {
- Vertex<I, V , E> vertex = conf.createVertex();
- readVertexData(inputStream, vertex);
- partition.putVertex(vertex);
- }
- inputStream.close();
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
-
- // Edges
- file = new File(getEdgesPath(partitionId));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartition: loading partition edges " +
- partition.getId() + " from " + file.getAbsolutePath());
- }
-
- filein = new FileInputStream(file);
- bufferin = new BufferedInputStream(filein);
- inputStream = new DataInputStream(bufferin);
- for (int i = 0; i < numVertices; ++i) {
- readOutEdges(inputStream, partition);
- }
- inputStream.close();
- // If the graph is static and it is not INPUT_SUPERSTEP, keep the file
- // around.
- if (!conf.isStaticGraph() ||
- serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
- }
-
- // Load message for the current superstep
- loadMessages(partitionId);
-
- // Input vertex buffers
- // First, applying vertex buffers on disk (since they came earlier)
- Integer numBuffers = numPendingInputVerticesOnDisk.remove(partitionId);
- if (numBuffers != null) {
- file = new File(getPendingVerticesBufferPath(partitionId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartition: loading " + numBuffers + " input vertex " +
- "buffers of partition " + partitionId + " from " +
- file.getAbsolutePath());
- }
- filein = new FileInputStream(file);
- bufferin = new BufferedInputStream(filein);
- inputStream = new DataInputStream(bufferin);
- for (int i = 0; i < numBuffers; ++i) {
- ExtendedDataOutput extendedDataOutput =
- WritableUtils.readExtendedDataOutput(inputStream, conf);
- partition.addPartitionVertices(
- new VertexIterator<I, V, E>(extendedDataOutput, conf));
- }
- inputStream.close();
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
- }
- // Second, applying vertex buffers already in memory
- Pair<Integer, List<ExtendedDataOutput>> vertexPair;
- vertexBufferRWLock.writeLock().lock();
- vertexPair = pendingInputVertices.remove(partitionId);
- vertexBufferRWLock.writeLock().unlock();
- if (vertexPair != null) {
- for (ExtendedDataOutput extendedDataOutput : vertexPair.getRight()) {
- partition.addPartitionVertices(
- new VertexIterator<I, V, E>(extendedDataOutput, conf));
- }
- }
-
- // Edge store
- if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
- checkState(hasEdgeStoreOnDisk.containsKey(partitionId),
- "loadPartition: partition is written to disk in INPUT_SUPERSTEP, " +
- "but it is not clear whether its edge store is on disk or not " +
- "(impossible)");
-
- if (hasEdgeStoreOnDisk.remove(partitionId)) {
- file = new File(getEdgeStorePath(partitionId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartition: loading edge store of partition " +
- partitionId + " from " + file.getAbsolutePath());
- }
- filein = new FileInputStream(file);
- bufferin = new BufferedInputStream(filein);
- inputStream = new DataInputStream(bufferin);
- edgeStore.readPartitionEdgeStore(partitionId, inputStream);
- inputStream.close();
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
- }
-
- // Input edge buffers
- // First, applying edge buffers on disk (since they came earlier)
- numBuffers = numPendingInputEdgesOnDisk.remove(partitionId);
- if (numBuffers != null) {
- file = new File(getPendingEdgesBufferPath(partitionId));
- if (LOG.isDebugEnabled()) {
- LOG.debug("loadPartition: loading " + numBuffers + " input edge " +
- "buffers of partition " + partitionId + " from " +
- file.getAbsolutePath());
- }
- filein = new FileInputStream(file);
- bufferin = new BufferedInputStream(filein);
- inputStream = new DataInputStream(bufferin);
- for (int i = 0; i < numBuffers; ++i) {
- VertexIdEdges<I, E> vertexIdEdges =
- new ByteArrayVertexIdEdges<I, E>();
- vertexIdEdges.setConf(conf);
- vertexIdEdges.readFields(inputStream);
- edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
- }
- inputStream.close();
- checkState(file.delete(), "loadPartition: failed to delete %s",
- file.getAbsolutePath());
- }
- // Second, applying edge buffers already in memory
- Pair<Integer, List<VertexIdEdges<I, E>>> edgePair = null;
- edgeBufferRWLock.writeLock().lock();
- edgePair = pendingInputEdges.remove(partitionId);
- edgeBufferRWLock.writeLock().unlock();
- if (edgePair != null) {
- for (VertexIdEdges<I, E> vertexIdEdges : edgePair.getRight()) {
- edgeStore.addPartitionEdges(partitionId, vertexIdEdges);
- }
- }
- }
- return partition;
- }
-
- /**
- * Write a partition to disk.
- *
- * @param meta meta partition containing the partition to offload
- * @throws IOException
- */
- private void offloadPartition(MetaPartition meta) throws IOException {
- Partition<I, V, E> partition = meta.getPartition();
- int partitionId = meta.getId();
- File file = new File(getVerticesPath(partitionId));
- File parent = file.getParentFile();
- if (!parent.exists() && !parent.mkdirs() && LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: directory " + parent.getAbsolutePath() +
- " already exists.");
- }
-
- checkState(file.createNewFile(),
- "offloadPartition: file %s already exists.", parent.getAbsolutePath());
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: writing partition vertices " +
- partitionId + " to " + file.getAbsolutePath());
- }
-
- FileOutputStream fileout = new FileOutputStream(file);
- BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
- DataOutputStream outputStream = new DataOutputStream(bufferout);
- for (Vertex<I, V, E> vertex : partition) {
- writeVertexData(outputStream, vertex);
- }
- outputStream.close();
-
- // Avoid writing back edges if we have already written them once and
- // the graph is not changing.
- // If we are in the input superstep, we need to write the files
- // at least the first time, even though the graph is static.
- file = new File(getEdgesPath(partitionId));
- if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP ||
- meta.getPrevVertexCount() != partition.getVertexCount() ||
- !conf.isStaticGraph() || !file.exists()) {
-
- meta.setPrevVertexCount(partition.getVertexCount());
-
- if (!file.createNewFile() && LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: file " + file.getAbsolutePath() +
- " already exists.");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: writing partition edges " +
- partitionId + " to " + file.getAbsolutePath());
- }
-
- fileout = new FileOutputStream(file);
- bufferout = new BufferedOutputStream(fileout);
- outputStream = new DataOutputStream(bufferout);
- for (Vertex<I, V, E> vertex : partition) {
- writeOutEdges(outputStream, vertex);
- }
- outputStream.close();
- }
-
- if (currentMessageStore != null &&
- !conf.getOutgoingMessageClasses().useMessageCombiner() &&
- currentMessageStore.hasMessagesForPartition(partitionId)) {
- writeMessageData(currentMessageStore, currentMessagesOnDisk, partitionId,
- serviceWorker.getSuperstep());
- }
- if (incomingMessageStore != null &&
- !conf.getIncomingMessageClasses().useMessageCombiner() &&
- incomingMessageStore.hasMessagesForPartition(partitionId)) {
- writeMessageData(incomingMessageStore, incomingMessagesOnDisk,
- partitionId, serviceWorker.getSuperstep() + 1);
- }
-
- // Writing edge store to disk in the input superstep
- if (serviceWorker.getSuperstep() == BspServiceWorker.INPUT_SUPERSTEP) {
- if (edgeStore.hasPartitionEdges(partitionId)) {
- hasEdgeStoreOnDisk.put(partitionId, true);
- file = new File(getEdgeStorePath(partitionId));
- if (!file.createNewFile() && LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: file " + file.getAbsolutePath() +
- " already exists.");
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("offloadPartition: writing partition edge store of " +
- partitionId + " to " + file.getAbsolutePath());
- }
-
- fileout = new FileOutputStream(file);
- bufferout = new BufferedOutputStream(fileout);
- outputStream = new DataOutputStream(bufferout);
- edgeStore.writePartitionEdgeStore(partitionId, outputStream);
- outputStream.close();
- } else {
- hasEdgeStoreOnDisk.put(partitionId, false);
- }
- }
- }
-
- /**
- * Offload message data of a particular type of store (current or incoming) to
- * disk.
- *
- * @param messageStore The message store to write to disk
- * @param messagesOnDisk Map to update and let others know that this message
- * store is on disk
- * @param partitionId Id of the partition we want to offload the message store
- * of
- * @param superstep Superstep for which we want to offload message data for.
- * It is equal the current superstep number for offloading
- * currentMessageStore, and is equal to next superstep
- * number for offloading incomingMessageStore
- * @throws IOException
- */
- private void writeMessageData(MessageStore<I, Writable> messageStore,
- ConcurrentMap<Integer, Boolean> messagesOnDisk, int partitionId,
- long superstep) throws IOException {
- File file = new File(getMessagesPath(partitionId, superstep));
- checkState(!file.exists(),
- "writeMessageData: message store file for partition " +
- partitionId + " for messages in superstep " +
- superstep + " already exist (impossible).");
-
- checkState(file.createNewFile(),
- "offloadPartition: cannot create message store file for " +
- "partition " + partitionId);
-
- FileOutputStream fileout = new FileOutputStream(file);
- BufferedOutputStream bufferout = new BufferedOutputStream(fileout);
- DataOutputStream outputStream = new DataOutputStream(bufferout);
- messageStore.writePartition(outputStream, partitionId);
- messageStore.clearPartition(partitionId);
- outputStream.close();
- messagesOnDisk.put(partitionId, true);
- }
-
- /**
- * Delete a partition's files.
- *
- * @param id The id of the partition owning the file.
- */
- private void deletePartitionFiles(Integer id) {
- // File containing vertices
- File file = new File(getVerticesPath(id));
- checkState(!file.exists() || file.delete(),
- "deletePartitionFiles: Failed to delete file " +
- file.getAbsolutePath());
-
- // File containing edges
- file = new File(getEdgesPath(id));
- checkState(!file.exists() || file.delete(),
- "deletePartitionFiles: Failed to delete file " +
- file.getAbsolutePath());
- }
-
- /**
- * Get the path and basename of the storage files.
- *
- * @param partitionId The partition
- * @return The path to the given partition
- */
- private String getPartitionPath(Integer partitionId) {
- int hash = hasher.hashInt(partitionId).asInt();
- int idx = Math.abs(hash % basePaths.length);
- return basePaths[idx] + "/partition-" + partitionId;
- }
-
- /**
- * Get the path to the file where vertices are stored.
- *
- * @param partitionId The partition
- * @return The path to the vertices file
- */
- private String getVerticesPath(Integer partitionId) {
- return getPartitionPath(partitionId) + "_vertices";
- }
-
- /**
- * Get the path to the file where pending vertices in INPUT_SUPERSTEP
- * are stored.
- *
- * @param partitionId The partition
- * @return The path to the file
- */
- private String getPendingVerticesBufferPath(Integer partitionId) {
- return getPartitionPath(partitionId) + "_pending_vertices";
- }
-
- /**
- * Get the path to the file where edge store of a partition in INPUT_SUPERSTEP
- * is stored.
- *
- * @param partitionId The partition
- * @return The path to the file
- */
- private String getEdgeStorePath(Integer partitionId) {
- return getPartitionPath(partitionId) + "_edge_store";
- }
-
- /**
- * Get the path to the file where pending edges in INPUT_SUPERSTEP
- * are stored.
- *
- * @param partitionId The partition
- * @return The path to the file
- */
- private String getPendingEdgesBufferPath(Integer partitionId) {
- return getPartitionPath(partitionId) + "_pending_edges";
- }
-
- /**
- * Get the path to the file where edges are stored.
- *
- * @param partitionId The partition
- * @return The path to the edges file
- */
- private String getEdgesPath(Integer partitionId) {
- return getPartitionPath(partitionId) + "_edges";
- }
-
- /**
- * Get the path to the file where pending incoming messages are stored.
- *
- * @param partitionId The partition
- * @param superstep superstep number
- * @return The path to the file
- */
- private String getPendingMessagesBufferPath(Integer partitionId,
- long superstep) {
- return getPartitionPath(partitionId) + "_pending_messages_" + superstep;
- }
-
- /**
- * Get the path to the file where messages are stored.
- *
- * @param partitionId The partition
- * @param superstep superstep number
- * @return The path to the file
- */
- private String getMessagesPath(Integer partitionId, long superstep) {
- return getPartitionPath(partitionId) + "_messages_" + superstep;
- }
-
- /**
- * Partition container holding additional meta data associated with each
- * partition.
- */
- private class MetaPartition {
- // ---- META INFORMATION ----
- /** ID of the partition */
- private int id;
- /** State in which the partition is */
- private State state;
- /** Number of vertices contained in the partition */
- private long vertexCount;
- /** Previous number of vertices contained in the partition */
- private long prevVertexCount;
- /** Number of edges contained in the partition */
- private long edgeCount;
- /**
- * Whether the partition is already processed in the current iteration
- * cycle
- */
- private boolean isProcessed;
-
- // ---- PARTITION ----
- /** the actual partition. Depending on the state of the partition,
- this object could be empty. */
- private Partition<I, V, E> partition;
-
- /**
- * Initialization of the metadata enriched partition.
- *
- * @param id id of the partition
- */
- public MetaPartition(int id) {
- this.id = id;
- this.state = State.INIT;
- this.vertexCount = 0;
- this.prevVertexCount = 0;
- this.edgeCount = 0;
- this.isProcessed = false;
-
- this.partition = null;
- }
-
- /**
- * @return the id
- */
- public int getId() {
- return id;
- }
-
- /**
- * @return the state
- */
- public State getState() {
- return state;
- }
-
- /**
- * This function sets the metadata for on-disk partition.
- */
- public void setOnDisk() {
- this.state = State.ON_DISK;
- this.vertexCount = partition.getVertexCount();
- this.edgeCount = partition.getEdgeCount();
- this.partition = null;
- }
-
- /**
- *
- * @param partition the partition associated to this container
- */
- public void setActive(Partition<I, V, E> partition) {
- if (partition != null) {
- this.partition = partition;
- }
- this.state = State.ACTIVE;
- this.prevVertexCount = this.vertexCount;
- this.vertexCount = 0;
- }
-
- /**
- * @param state the state to set
- */
- public void setState(State state) {
- this.state = state;
- }
-
- /**
- * set previous number of vertexes
- * @param vertexCount number of vertexes
- */
- public void setPrevVertexCount(long vertexCount) {
- this.prevVertexCount = vertexCount;
- }
-
- /**
- * @return the vertexCount
- */
- public long getPrevVertexCount() {
- return prevVertexCount;
- }
-
- /**
- * @return the vertexCount
- */
- public long getVertexCount() {
- return vertexCount;
- }
-
- /**
- * @return the edgeCount
- */
- public long getEdgeCount() {
- return edgeCount;
- }
-
- /**
- * @return true iff the partition is marked as processed.
- */
- public boolean isProcessed() {
- return isProcessed;
- }
-
- /**
- * Set the state of this partition in terms of being already processed or
- * not
- * @param isProcessed whether the partition is processed or not
- */
- public void setProcessed(boolean isProcessed) {
- this.isProcessed = isProcessed;
- }
-
- /**
- * @return the partition
- */
- public Partition<I, V, E> getPartition() {
- return partition;
- }
-
- /**
- * @param partition the partition to set
- */
- public void setPartition(Partition<I, V, E> partition) {
- this.partition = partition;
- }
-
- @Override
- public String toString() {
- StringBuffer sb = new StringBuffer();
-
- sb.append("Meta Data: { ");
- sb.append("ID: " + id + "; ");
- sb.append("State: " + state + "; ");
- sb.append("Number of Vertices: " + vertexCount + "; ");
- sb.append("Previous number of Vertices: " + prevVertexCount + "; ");
- sb.append("Number of edges: " + edgeCount + "; ");
- sb.append("Is processed: " + isProcessed + "; }");
- sb.append("Partition: " + partition + "; }");
-
- return sb.toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/fafecee7/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
new file mode 100644
index 0000000..9324239
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/ooc/FixedOutOfCoreEngine.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.ooc;
+
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.ooc.data.MetaPartitionManager;
+import org.apache.giraph.ooc.io.IOCommand;
+import org.apache.giraph.ooc.io.LoadPartitionIOCommand;
+import org.apache.giraph.ooc.io.StorePartitionIOCommand;
+import org.apache.giraph.ooc.io.WaitIOCommand;
+import org.apache.log4j.Logger;
+
+/**
+ * Out-of-core engine maintaining fixed number of partitions in memory.
+ */
+public class FixedOutOfCoreEngine extends OutOfCoreEngine {
+ /** Class logger. */
+ private static final Logger LOG =
+ Logger.getLogger(FixedOutOfCoreEngine.class);
+ /**
+ * When getting partitions, how many milliseconds to wait if no partition was
+ * available in memory
+ */
+ private static final long MSEC_TO_WAIT = 1000;
+ /**
+ * Dummy object to wait on until a partition becomes available in memory
+ * for processing
+ */
+ private final Object partitionAvailable = new Object();
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration
+ * @param service Service worker
+ * @param maxPartitionsInMemory Maximum number of partitions that can be kept
+ * in memory
+ */
+ public FixedOutOfCoreEngine(ImmutableClassesGiraphConfiguration<?, ?, ?> conf,
+ CentralizedServiceWorker<?, ?, ?> service,
+ int maxPartitionsInMemory) {
+ super(conf, service);
+ this.ioScheduler = new FixedOutOfCoreIOScheduler(maxPartitionsInMemory,
+ numIOThreads, this, conf);
+ }
+
+ @Override
+ public Integer getNextPartition() {
+ Integer partitionId;
+ synchronized (partitionAvailable) {
+ while ((partitionId = metaPartitionManager.getNextPartition()) == null) {
+ try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("getNextPartition: waiting until a partition becomes " +
+ "available!");
+ }
+ partitionAvailable.wait(MSEC_TO_WAIT);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("getNextPartition: caught " +
+ "InterruptedException while waiting to retrieve a partition to " +
+ "process");
+ }
+ if (jobFailed) {
+ throw new RuntimeException("Job Failed due to a failure in an " +
+ "out-of-core IO thread");
+ }
+ }
+ }
+ if (partitionId == MetaPartitionManager.NO_PARTITION_TO_PROCESS) {
+ partitionId = null;
+ }
+ return partitionId;
+ }
+
+ @Override
+ public void doneProcessingPartition(int partitionId) {
+ metaPartitionManager.setPartitionIsProcessed(partitionId);
+ // Put the partition in store IO command queue and announce this partition
+ // as a candidate to offload to disk.
+ if (LOG.isInfoEnabled()) {
+ LOG.info("doneProcessingPartition: processing partition " + partitionId +
+ " is done!");
+ }
+ ioScheduler.addIOCommand(new StorePartitionIOCommand(this, partitionId));
+ }
+
+ @Override
+ public void startIteration() {
+ getSuperstepLock().writeLock().lock();
+ metaPartitionManager.resetPartition();
+ ((FixedOutOfCoreIOScheduler) ioScheduler).clearStoreCommandQueue();
+ getSuperstepLock().writeLock().unlock();
+ }
+
+ @Override
+ public void retrievePartition(int partitionId) {
+ long superstep = service.getSuperstep();
+ if (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+ ioScheduler.addIOCommand(new LoadPartitionIOCommand(this, partitionId,
+ superstep));
+ synchronized (partitionAvailable) {
+ while (metaPartitionManager.isPartitionOnDisk(partitionId)) {
+ try {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("retrievePartition: waiting until partition " +
+ partitionId + " becomes available");
+ }
+ partitionAvailable.wait();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException("retrievePartition: caught " +
+ "InterruptedException while waiting to retrieve partition " +
+ partitionId);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void ioCommandCompleted(IOCommand command) {
+ if (command instanceof LoadPartitionIOCommand ||
+ command instanceof WaitIOCommand) {
+ // Notifying compute threads who are waiting for a partition to become
+ // available in memory to process.
+ synchronized (partitionAvailable) {
+ partitionAvailable.notifyAll();
+ }
+ }
+ }
+}