You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by cl...@apache.org on 2013/02/05 13:21:11 UTC
git commit: GIRAPH-461: Convert static assignment of in-memory
partitions with LRU cache
Updated Branches:
refs/heads/trunk af1c39b43 -> 62c12fa0b
GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/62c12fa0
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/62c12fa0
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/62c12fa0
Branch: refs/heads/trunk
Commit: 62c12fa0b899ee76962d6e3fa05298e2a23d5e68
Parents: af1c39b
Author: Claudio Martella <cl...@apache.org>
Authored: Tue Feb 5 13:02:27 2013 +0100
Committer: Claudio Martella <cl...@apache.org>
Committed: Tue Feb 5 13:02:27 2013 +0100
----------------------------------------------------------------------
CHANGELOG | 2 +
.../giraph/bsp/CentralizedServiceWorker.java | 19 -
.../giraph/comm/netty/NettyWorkerServer.java | 2 +
.../org/apache/giraph/graph/ComputeCallable.java | 2 +
.../giraph/partition/DiskBackedPartitionStore.java | 872 +++++++++++----
.../apache/giraph/partition/PartitionStore.java | 28 +-
.../giraph/partition/SimplePartitionStore.java | 3 +-
.../org/apache/giraph/worker/BspServiceWorker.java | 35 +-
.../java/org/apache/giraph/comm/RequestTest.java | 7 +-
.../giraph/partition/TestPartitionStores.java | 17 +-
10 files changed, 708 insertions(+), 279 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index aec8fdf..2d25746 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio)
+
GIRAPH-494: Make Edge an interface (nitay)
GIRAPH-492: Saving vertices has no status report, making it hard to
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index 30d4462..71f8f72 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -34,8 +34,6 @@ import org.apache.giraph.partition.PartitionStore;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.vertex.Vertex;
-import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.worker.WorkerInfo;
@@ -135,15 +133,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
List<PartitionStats> partitionStatsList);
/**
- * Get the partition that a vertex id would belong to.
- *
- * @param vertexId Id of the vertex that is used to find the correct
- * partition.
- * @return Correct partition if exists on this worker, null otherwise.
- */
- Partition<I, V, E, M> getPartition(I vertexId);
-
- /**
* Get the partition id that a vertex id would belong to.
*
* @param vertexId Vertex id
@@ -176,14 +165,6 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
Iterable<? extends PartitionOwner> getPartitionOwners();
/**
- * Look up a vertex on a worker given its vertex index.
- *
- * @param vertexId Vertex index to look for
- * @return Vertex if it exists on this worker.
- */
- Vertex<I, V, E, M> getVertex(I vertexId);
-
- /**
* If desired by the user, vertex partitions are redistributed among
* workers according to the chosen WorkerGraphPartitioner.
*
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
index e2866fd..1b7cc54 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyWorkerServer.java
@@ -184,6 +184,7 @@ public class NettyWorkerServer<I extends WritableComparable,
}
}
}
+ service.getPartitionStore().putPartition(partition);
}
}
// Resolve all graph mutations
@@ -226,6 +227,7 @@ public class NettyWorkerServer<I extends WritableComparable,
partition.removeVertex(originalVertex.getId());
}
}
+ service.getPartitionStore().putPartition(partition);
}
if (!serverData.getVertexMutations().isEmpty()) {
throw new IllegalStateException("resolveMutations: Illegally " +
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 042fd47..a87561d 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -157,6 +157,8 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
} catch (IOException e) {
throw new IllegalStateException("call: Caught unexpected IOException," +
" failing.", e);
+ } finally {
+ serviceWorker.getPartitionStore().putPartition(partition);
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
index 09e5d75..844a229 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DiskBackedPartitionStore.java
@@ -18,69 +18,106 @@
package org.apache.giraph.partition;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.log4j.Logger;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.collect.Sets;
/**
- * A partition store that can possibly spill to disk.
+ * Disk-backed PartitionStore. Partitions are stored in memory on a LRU basis.
+ * Thread-safe, but expects the caller to synchronized between deletes, adds,
+ * puts and gets.
*
* @param <I> Vertex id
* @param <V> Vertex data
* @param <E> Edge data
* @param <M> Message data
*/
+@SuppressWarnings("rawtypes")
public class DiskBackedPartitionStore<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
extends PartitionStore<I, V, E, M> {
/** Class logger. */
private static final Logger LOG =
Logger.getLogger(DiskBackedPartitionStore.class);
- /** Map of partitions kept in memory. */
- private final ConcurrentMap<Integer, Partition<I, V, E, M>>
- inMemoryPartitions = new ConcurrentHashMap<Integer, Partition<I, V, E, M>>();
- /** Maximum number of partitions to keep in memory. */
- private int maxInMemoryPartitions;
- /** Map of partitions kept out-of-core. The values are partition sizes. */
- private final ConcurrentMap<Integer, Integer> onDiskPartitions =
- Maps.newConcurrentMap();
- /** Directory on the local file system for storing out-of-core partitions. */
- private final String basePath;
- /** Configuration. */
+ /** States the partition can be found in */
+ private enum State { ACTIVE, INACTIVE, LOADING, OFFLOADING, ONDISK };
+ /** Global lock to the whole partition */
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /**
+ * Global write lock. Must be hold to modify class state for read and write.
+ * Conditions are bond to this lock.
+ */
+ private final Lock wLock = lock.writeLock();
+ /** The ids of the partitions contained in the store */
+ private final Set<Integer> partitionIds = Sets.newHashSet();
+ /** Partitions' states store */
+ private final Map<Integer, State> states = Maps.newHashMap();
+ /** Current active partitions, which have not been put back yet */
+ private final Map<Integer, Partition<I, V, E, M>> active = Maps.newHashMap();
+ /** Inactive partitions to re-activate or spill to disk to make space */
+ private final Map<Integer, Partition<I, V, E, M>> inactive =
+ Maps.newLinkedHashMap();
+ /** Ids of partitions stored on disk and number of vertices contained */
+ private final Map<Integer, Integer> onDisk = Maps.newHashMap();
+ /** Per-partition users counters (clearly only for active partitions) */
+ private final Map<Integer, Integer> counters = Maps.newHashMap();
+ /** These Conditions are used to partitions' change of state */
+ private final Map<Integer, Condition> pending = Maps.newHashMap();
+ /**
+ * Used to signal threads waiting to load partitions. Can be used when new
+ * inactive partitions are avaiable, or when free slots are available.
+ */
+ private final Condition notEmpty = wLock.newCondition();
+ /** Executors for users requests. Uses caller threads */
+ private final ExecutorService pool = new DirectExecutorService();
+ /** Giraph configuration */
private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Slot for loading out-of-core partitions. */
- private Partition<I, V, E, M> loadedPartition;
- /** Locks for accessing and modifying partitions. */
- private final ConcurrentMap<Integer, Lock> partitionLocks =
- Maps.newConcurrentMap();
- /** Context used to report progress */
- private final Mapper<?, ?, ?, ?>.Context context;
+ /** Mapper context */
+ private final Context context;
+ /** Base path where the partition files are written to */
+ private final String basePath;
+ /** Maximum number of slots */
+ private final int maxInMemoryPartitions;
+ /** Number of slots used */
+ private int inMemoryPartitions;
/**
- * Constructor.
+ * Constructor
*
* @param conf Configuration
- * @param context Mapper context
+ * @param context Context
*/
public DiskBackedPartitionStore(
ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
@@ -96,76 +133,233 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
GiraphConstants.PARTITIONS_DIRECTORY_DEFAULT);
}
- /**
- * Get the path to the file where a partition is stored.
- *
- * @param partitionId The partition
- * @return The path to the given partition
- */
- private String getPartitionPath(Integer partitionId) {
- return basePath + "/partition-" + partitionId;
+ @Override
+ public Iterable<Integer> getPartitionIds() {
+ try {
+ return pool.submit(new Callable<Iterable<Integer>>() {
+
+ @Override
+ public Iterable<Integer> call() throws Exception {
+ wLock.lock();
+ try {
+ return Iterables.unmodifiableIterable(partitionIds);
+ } finally {
+ wLock.unlock();
+ }
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getPartitionIds: cannot retrieve partition ids", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "getPartitionIds: cannot retrieve partition ids", e);
+ }
}
- /**
- * Create a new lock for a partition, lock it, and return it. If already
- * existing, return null.
- *
- * @param partitionId Partition id
- * @return A newly created lock, or null if already present
- */
- private Lock createLock(Integer partitionId) {
- Lock lock = new ReentrantLock(true);
- lock.lock();
- if (partitionLocks.putIfAbsent(partitionId, lock) != null) {
- return null;
+ @Override
+ public boolean hasPartition(final Integer id) {
+ try {
+ return pool.submit(new Callable<Boolean>() {
+
+ @Override
+ public Boolean call() throws Exception {
+ wLock.lock();
+ try {
+ return partitionIds.contains(id);
+ } finally {
+ wLock.unlock();
+ }
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "hasPartition: cannot check partition", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "hasPartition: cannot check partition", e);
+ }
+ }
+
+ @Override
+ public int getNumPartitions() {
+ try {
+ return pool.submit(new Callable<Integer>() {
+
+ @Override
+ public Integer call() throws Exception {
+ wLock.lock();
+ try {
+ return partitionIds.size();
+ } finally {
+ wLock.unlock();
+ }
+ }
+ }).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getNumPartitions: cannot retrieve partition ids", e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "getNumPartitions: cannot retrieve partition ids", e);
+ }
+ }
+
+ @Override
+ public Partition<I, V, E, M> getPartition(Integer id) {
+ try {
+ return pool.submit(new GetPartition(id)).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "getPartition: cannot retrieve partition " + id, e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "getPartition: cannot retrieve partition " + id, e);
+ }
+ }
+
+ @Override
+ public void putPartition(Partition<I, V, E, M> partition) {
+ Integer id = partition.getId();
+ try {
+ pool.submit(new PutPartition(id, partition)).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "putPartition: cannot put back partition " + id, e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "putPartition: cannot put back partition " + id, e);
}
- return lock;
+ }
+
+ @Override
+ public void deletePartition(Integer id) {
+ try {
+ pool.submit(new DeletePartition(id)).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "deletePartition: cannot delete partition " + id, e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "deletePartition: cannot delete partition " + id, e);
+ }
+ }
+
+ @Override
+ public Partition<I, V, E, M> removePartition(Integer id) {
+ Partition<I, V, E, M> partition = getPartition(id);
+ // we put it back, so the partition can turn INACTIVE and be deleted.
+ putPartition(partition);
+ deletePartition(id);
+ return partition;
+ }
+
+ @Override
+ public void addPartition(Partition<I, V, E, M> partition) {
+ Integer id = partition.getId();
+ try {
+ pool.submit(new AddPartition(partition.getId(), partition)).get();
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(
+ "addPartition: cannot add partition " + id, e);
+ } catch (ExecutionException e) {
+ throw new IllegalStateException(
+ "addPartition: cannot add partition " + id, e);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ try {
+ pool.shutdown();
+ try {
+ if (!pool.awaitTermination(120, TimeUnit.SECONDS)) {
+ pool.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ pool.shutdownNow();
+ }
+ } finally {
+ for (Integer id : onDisk.values()) {
+ deletePartitionFile(id);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append(partitionIds.toString());
+ sb.append("\nActive\n");
+ for (Entry<Integer, Partition<I, V, E, M>> e : active.entrySet()) {
+ sb.append(e.getKey() + ":" + e.getValue() + "\n");
+ }
+ sb.append("Inactive\n");
+ for (Entry<Integer, Partition<I, V, E, M>> e : inactive.entrySet()) {
+ sb.append(e.getKey() + ":" + e.getValue() + "\n");
+ }
+ sb.append("OnDisk\n");
+ for (Entry<Integer, Integer> e : onDisk.entrySet()) {
+ sb.append(e.getKey() + ":" + e.getValue() + "\n");
+ }
+ sb.append("Counters\n");
+ for (Entry<Integer, Integer> e : counters.entrySet()) {
+ sb.append(e.getKey() + ":" + e.getValue() + "\n");
+ }
+ sb.append("Pending\n");
+ for (Entry<Integer, Condition> e : pending.entrySet()) {
+ sb.append(e.getKey() + "\n");
+ }
+ return sb.toString();
}
/**
- * Get the lock for a partition id.
+ * Increment the number of active users for a partition. Caller should hold
+ * the global write lock.
*
- * @param partitionId Partition id
- * @return The lock
+ * @param id The id of the counter to increment
+ * @return The new value
*/
- private Lock getLock(Integer partitionId) {
- return partitionLocks.get(partitionId);
+ private Integer incrementCounter(Integer id) {
+ Integer count = counters.get(id);
+ if (count == null) {
+ count = 0;
+ }
+ counters.put(id, ++count);
+ return count;
}
/**
- * Write a partition to disk.
+ * Decrement the number of active users for a partition. Caller should hold
+ * the global write lock.
*
- * @param partition The partition object to write
- * @throws java.io.IOException
+ * @param id The id of the counter to decrement
+ * @return The new value
*/
- private void writePartition(Partition<I, V, E, M> partition)
- throws IOException {
- File file = new File(getPartitionPath(partition.getId()));
- file.getParentFile().mkdirs();
- file.createNewFile();
- DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file)));
- for (Vertex<I, V, E, M> vertex : partition) {
- vertex.write(outputStream);
+ private Integer decrementCounter(Integer id) {
+ Integer count = counters.get(id);
+ if (count == null) {
+ throw new IllegalStateException("no counter for partition " + id);
}
- outputStream.close();
+ counters.put(id, --count);
+ return count;
}
/**
- * Read a partition from disk.
+ * Load a partition from disk. It deletes the file after the load.
*
- * @param partitionId Id of the partition to read
- * @return The partition object
+ * @param id The id of the partition to load
+ * @param numVertices The number of vertices contained on disk
+ * @return The partition
* @throws IOException
*/
- private Partition<I, V, E, M> readPartition(Integer partitionId)
+ private Partition<I, V, E, M> loadPartition(Integer id, int numVertices)
throws IOException {
Partition<I, V, E, M> partition =
- conf.createPartition(partitionId, context);
- File file = new File(getPartitionPath(partitionId));
+ conf.createPartition(id, context);
+ File file = new File(getPartitionPath(id));
DataInputStream inputStream = new DataInputStream(
new BufferedInputStream(new FileInputStream(file)));
- int numVertices = onDiskPartitions.get(partitionId);
for (int i = 0; i < numVertices; ++i) {
Vertex<I, V, E, M> vertex = conf.createVertex();
vertex.readFields(inputStream);
@@ -177,16 +371,22 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Append some vertices of another partition to an out-of-core partition.
+ * Write a partition to disk.
*
- * @param partition Partition to add
+ * @param partition The partition to offload
* @throws IOException
*/
- private void appendPartitionOutOfCore(Partition<I, V, E, M> partition)
+ private void offloadPartition(Partition<I, V, E, M> partition)
throws IOException {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("offloadPartition: writing partition " + partition.getId() +
+ " to disk.");
+ }
File file = new File(getPartitionPath(partition.getId()));
+ file.getParentFile().mkdirs();
+ file.createNewFile();
DataOutputStream outputStream = new DataOutputStream(
- new BufferedOutputStream(new FileOutputStream(file, true)));
+ new BufferedOutputStream(new FileOutputStream(file)));
for (Vertex<I, V, E, M> vertex : partition) {
vertex.write(outputStream);
}
@@ -194,171 +394,407 @@ public class DiskBackedPartitionStore<I extends WritableComparable,
}
/**
- * Load an out-of-core partition in memory.
+ * Append a partition on disk at the end of the file. Expects the caller
+ * to hold the global lock.
*
- * @param partitionId Partition id
+ * @param partition The partition
+ * @throws IOException
*/
- private void loadPartition(Integer partitionId) {
- if (loadedPartition != null) {
- if (loadedPartition.getId() == partitionId) {
- return;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadPartition: moving partition " + loadedPartition.getId() +
- " out of core with size " + loadedPartition.getVertexCount());
- }
- try {
- writePartition(loadedPartition);
- onDiskPartitions.put(loadedPartition.getId(),
- (int) loadedPartition.getVertexCount());
- loadedPartition = null;
- } catch (IOException e) {
- throw new IllegalStateException("loadPartition: failed writing " +
- "partition " + loadedPartition.getId() + " to disk", e);
- }
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("loadPartition: loading partition " + partitionId +
- " in memory");
- }
- try {
- loadedPartition = readPartition(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("loadPartition: failed reading " +
- "partition " + partitionId + " from disk");
+ private void addToOOCPartition(Partition<I, V, E, M> partition)
+ throws IOException {
+ Integer id = partition.getId();
+ Integer count = onDisk.get(id);
+ onDisk.put(id, count + (int) partition.getVertexCount());
+ File file = new File(getPartitionPath(id));
+ DataOutputStream outputStream = new DataOutputStream(
+ new BufferedOutputStream(new FileOutputStream(file, true)));
+ for (Vertex<I, V, E, M> vertex : partition) {
+ vertex.write(outputStream);
}
+ outputStream.close();
}
/**
- * Add a new partition without requiring a lock.
+ * Delete a partition file
*
- * @param partition Partition to be added
+ * @param id The id of the partition owning the file.
*/
- private void addPartitionNoLock(Partition<I, V, E, M> partition) {
- synchronized (inMemoryPartitions) {
- if (inMemoryPartitions.size() + 1 < maxInMemoryPartitions) {
- inMemoryPartitions.put(partition.getId(), partition);
+ public void deletePartitionFile(Integer id) {
+ File file = new File(getPartitionPath(id));
+ file.delete();
+ }
- return;
- }
+ /**
+ * Get the path to the file where a partition is stored.
+ *
+ * @param partitionId The partition
+ * @return The path to the given partition
+ */
+ private String getPartitionPath(Integer partitionId) {
+ return basePath + "/partition-" + partitionId;
+ }
+
+ /**
+ * Task that gets a partition from the store
+ */
+ private class GetPartition implements Callable<Partition<I, V, E, M>> {
+ /** Partition id */
+ private Integer id;
+
+ /**
+ * Constructor
+ *
+ * @param id Partition id
+ */
+ public GetPartition(Integer id) {
+ this.id = id;
}
- try {
- writePartition(partition);
- onDiskPartitions.put(partition.getId(),
- (int) partition.getVertexCount());
- } catch (IOException e) {
- throw new IllegalStateException("addPartition: failed writing " +
- "partition " + partition.getId() + "to disk");
+
+ /**
+ * Removes and returns the last recently used entry.
+ *
+ * @return The last recently used entry.
+ */
+ private Entry<Integer, Partition<I, V, E, M>> getLRUEntry() {
+ Iterator<Entry<Integer, Partition<I, V, E, M>>> i =
+ inactive.entrySet().iterator();
+ Entry<Integer, Partition<I, V, E, M>> lruEntry = i.next();
+ i.remove();
+ return lruEntry;
}
- }
- @Override
- public void addPartition(Partition<I, V, E, M> partition) {
- if (inMemoryPartitions.containsKey(partition.getId())) {
- Partition<I, V, E, M> existingPartition =
- inMemoryPartitions.get(partition.getId());
- existingPartition.addPartition(partition);
- } else if (onDiskPartitions.containsKey(partition.getId())) {
- Lock lock = getLock(partition.getId());
- lock.lock();
- if (loadedPartition != null && loadedPartition.getId() ==
- partition.getId()) {
- loadedPartition.addPartition(partition);
- } else {
+ @Override
+ public Partition<I, V, E, M> call() throws Exception {
+ Partition<I, V, E, M> partition = null;
+
+ while (partition == null) {
+ wLock.lock();
try {
- appendPartitionOutOfCore(partition);
- onDiskPartitions.put(partition.getId(),
- onDiskPartitions.get(partition.getId()) +
- (int) partition.getVertexCount());
- } catch (IOException e) {
- throw new IllegalStateException("addPartition: failed " +
- "writing vertices to partition " + partition.getId() + " on disk",
- e);
+ State pState = states.get(id);
+ switch (pState) {
+ case ONDISK:
+ Entry<Integer, Partition<I, V, E, M>> lru = null;
+ states.put(id, State.LOADING);
+ int numVertices = onDisk.remove(id);
+ /*
+ * Wait until we have space in memory or inactive data for a switch
+ */
+ while (inMemoryPartitions >= maxInMemoryPartitions &&
+ inactive.size() == 0) {
+ notEmpty.await();
+ }
+ /*
+ * we have to make some space first
+ */
+ if (inMemoryPartitions >= maxInMemoryPartitions) {
+ lru = getLRUEntry();
+ states.put(lru.getKey(), State.OFFLOADING);
+ pending.get(lru.getKey()).signalAll();
+ } else { // there is space, just add it to the in-memory partitions
+ inMemoryPartitions++;
+ }
+ /*
+ * do IO without contention, the threads interested to these
+ * partitions will subscribe to the relative Condition.
+ */
+ wLock.unlock();
+ if (lru != null) {
+ offloadPartition(lru.getValue());
+ }
+ partition = loadPartition(id, numVertices);
+ wLock.lock();
+ /*
+ * update state and signal the pending threads
+ */
+ if (lru != null) {
+ states.put(lru.getKey(), State.ONDISK);
+ onDisk.put(lru.getKey(), (int) lru.getValue().getVertexCount());
+ pending.get(lru.getKey()).signalAll();
+ }
+ active.put(id, partition);
+ states.put(id, State.ACTIVE);
+ pending.get(id).signalAll();
+ incrementCounter(id);
+ break;
+ case INACTIVE:
+ partition = inactive.remove(id);
+ active.put(id, partition);
+ states.put(id, State.ACTIVE);
+ incrementCounter(id);
+ break;
+ case ACTIVE:
+ partition = active.get(id);
+ incrementCounter(id);
+ break;
+ case LOADING:
+ pending.get(id).await();
+ break;
+ case OFFLOADING:
+ pending.get(id).await();
+ break;
+ default:
+ throw new IllegalStateException(
+ "illegal state " + pState + " for partition " + id);
+ }
+ } finally {
+ wLock.unlock();
}
}
- lock.unlock();
- } else {
- Lock lock = createLock(partition.getId());
- if (lock != null) {
- addPartitionNoLock(partition);
- lock.unlock();
- } else {
- // Another thread is already creating the partition,
- // so we make sure it's done before repeating the call.
- lock = getLock(partition.getId());
- lock.lock();
- lock.unlock();
- addPartition(partition);
- }
+ return partition;
}
}
- @Override
- public Partition<I, V, E, M> getPartition(Integer partitionId) {
- if (inMemoryPartitions.containsKey(partitionId)) {
- return inMemoryPartitions.get(partitionId);
- } else if (onDiskPartitions.containsKey(partitionId)) {
- loadPartition(partitionId);
- return loadedPartition;
- } else {
- throw new IllegalStateException("getPartition: partition " +
- partitionId + " does not exist");
+ /**
+ * Task that puts a partition back to the store
+ */
+ private class PutPartition implements Callable<Void> {
+ /** Partition id */
+ private Integer id;
+
+ /**
+ * Constructor
+ *
+ * @param id The partition id
+ * @param partition The partition
+ */
+ public PutPartition(Integer id, Partition<I, V, E, M> partition) {
+ this.id = id;
}
- }
- @Override
- public Partition<I, V, E, M> removePartition(Integer partitionId) {
- partitionLocks.remove(partitionId);
- if (onDiskPartitions.containsKey(partitionId)) {
- Partition<I, V, E, M> partition;
- if (loadedPartition != null && loadedPartition.getId() == partitionId) {
- partition = loadedPartition;
- loadedPartition = null;
- } else {
- try {
- partition = readPartition(partitionId);
- } catch (IOException e) {
- throw new IllegalStateException("removePartition: failed reading " +
- "partition " + partitionId + " from disk", e);
+ @Override
+ public Void call() throws Exception {
+ wLock.lock();
+ try {
+ if (decrementCounter(id) == 0) {
+ inactive.put(id, active.remove(id));
+ states.put(id, State.INACTIVE);
+ pending.get(id).signalAll();
+ notEmpty.signal();
}
+ return null;
+ } finally {
+ wLock.unlock();
}
- onDiskPartitions.remove(partitionId);
- return partition;
- } else {
- return inMemoryPartitions.remove(partitionId);
}
}
- @Override
- public void deletePartition(Integer partitionId) {
- partitionLocks.remove(partitionId);
- if (inMemoryPartitions.containsKey(partitionId)) {
- inMemoryPartitions.remove(partitionId);
- } else {
- if (loadedPartition != null && loadedPartition.getId() == partitionId) {
- loadedPartition = null;
- } else {
- File file = new File(getPartitionPath(partitionId));
- file.delete();
+ /**
+ * Task that adds a partition to the store
+ */
+ private class AddPartition implements Callable<Void> {
+ /** Partition id */
+ private Integer id;
+ /** Partition */
+ private Partition<I, V, E, M> partition;
+
+ /**
+ * Constructor
+ *
+ * @param id The partition id
+ * @param partition The partition
+ */
+ public AddPartition(Integer id, Partition<I, V, E, M> partition) {
+ this.id = id;
+ this.partition = partition;
+ }
+
+ @Override
+ public Void call() throws Exception {
+
+ wLock.lock();
+ try {
+ if (partitionIds.contains(id)) {
+ Partition<I, V, E, M> existing = null;
+ boolean isOOC = false;
+ boolean done = false;
+ while (!done) {
+ State pState = states.get(id);
+ switch (pState) {
+ case ONDISK:
+ isOOC = true;
+ done = true;
+ break;
+ /*
+ * just add data to the in-memory partitions,
+ * concurrency should be managed by the caller.
+ */
+ case INACTIVE:
+ existing = inactive.get(id);
+ done = true;
+ break;
+ case ACTIVE:
+ existing = active.get(id);
+ done = true;
+ break;
+ case LOADING:
+ pending.get(id).await();
+ break;
+ case OFFLOADING:
+ pending.get(id).await();
+ break;
+ default:
+ throw new IllegalStateException(
+ "illegal state " + pState + " for partition " + id);
+ }
+ }
+ if (isOOC) {
+ addToOOCPartition(partition);
+ } else {
+ existing.addPartition(partition);
+ }
+ } else {
+ Condition newC = wLock.newCondition();
+ pending.put(id, newC);
+ partitionIds.add(id);
+ if (inMemoryPartitions < maxInMemoryPartitions) {
+ inMemoryPartitions++;
+ states.put(id, State.INACTIVE);
+ inactive.put(id, partition);
+ notEmpty.signal();
+ } else {
+ states.put(id, State.OFFLOADING);
+ onDisk.put(id, (int) partition.getVertexCount());
+ wLock.unlock();
+ offloadPartition(partition);
+ wLock.lock();
+ states.put(id, State.ONDISK);
+ newC.signalAll();
+ }
+ }
+ return null;
+ } finally {
+ wLock.unlock();
}
- onDiskPartitions.remove(partitionId);
}
}
- @Override
- public boolean hasPartition(Integer partitionId) {
- return partitionLocks.containsKey(partitionId);
- }
+ /**
+ * Task that deletes a partition to the store
+ */
+ private class DeletePartition implements Callable<Void> {
+ /** Partition id */
+ private Integer id;
- @Override
- public Iterable<Integer> getPartitionIds() {
- return Iterables.concat(inMemoryPartitions.keySet(),
- onDiskPartitions.keySet());
- }
+ /**
+ * Constructor
+ *
+ * @param id The partition id
+ */
+ public DeletePartition(Integer id) {
+ this.id = id;
+ }
- @Override
- public int getNumPartitions() {
- return partitionLocks.size();
+ @Override
+ public Void call() throws Exception {
+ boolean done = false;
+
+ wLock.lock();
+ try {
+ while (!done) {
+ State pState = states.get(id);
+ switch (pState) {
+ case ONDISK:
+ onDisk.remove(id);
+ deletePartitionFile(id);
+ done = true;
+ break;
+ case INACTIVE:
+ inactive.remove(id);
+ inMemoryPartitions--;
+ notEmpty.signal();
+ done = true;
+ break;
+ case ACTIVE:
+ pending.get(id).await();
+ break;
+ case LOADING:
+ pending.get(id).await();
+ break;
+ case OFFLOADING:
+ pending.get(id).await();
+ break;
+ default:
+ throw new IllegalStateException(
+ "illegal state " + pState + " for partition " + id);
+ }
+ }
+ partitionIds.remove(id);
+ states.remove(id);
+ counters.remove(id);
+ pending.remove(id).signalAll();
+ return null;
+ } finally {
+ wLock.unlock();
+ }
+ }
}
+ /**
+ * Direct Executor that executes tasks within the calling threads.
+ */
+ private class DirectExecutorService extends AbstractExecutorService {
+ /** Executor state */
+ private volatile boolean shutdown = false;
+
+ /**
+ * Constructor
+ */
+ public DirectExecutorService() { }
+
+ /**
+ * Execute the task in the calling thread.
+ *
+ * @param task Task to execute
+ */
+ public void execute(Runnable task) {
+ task.run();
+ }
+
+ /**
+ * Shutdown the executor.
+ */
+ public void shutdown() {
+ this.shutdown = true;
+ }
+
+ /**
+ * Shutdown the executor and return the current queue (empty).
+ *
+ * @return The list of awaiting tasks
+ */
+ public List<Runnable> shutdownNow() {
+ this.shutdown = true;
+ return Collections.emptyList();
+ }
+
+ /**
+ * Return current shutdown state.
+ *
+ * @return Shutdown state
+ */
+ public boolean isShutdown() {
+ return shutdown;
+ }
+
+ /**
+ * Return current termination state.
+ *
+ * @return Termination state
+ */
+ public boolean isTerminated() {
+ return shutdown;
+ }
+
+ /**
+ * Do nothing and return shutdown state.
+ *
+ * @param timeout Timeout
+ * @param unit Time unit
+ * @return Shutdown state
+ */
+ public boolean awaitTermination(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ return shutdown;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
index 3e8dda9..4206ce3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionStore.java
@@ -18,8 +18,6 @@
package org.apache.giraph.partition;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -43,7 +41,8 @@ public abstract class PartitionStore<I extends WritableComparable,
public abstract void addPartition(Partition<I, V, E, M> partition);
/**
- * Get a partition.
+ * Get a partition. Note: user has to put back it to the store through
+ * {@link #putPartition(Integer, Partition)} after use.
*
* @param partitionId Partition id
* @return The requested partition
@@ -51,6 +50,14 @@ public abstract class PartitionStore<I extends WritableComparable,
public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
/**
+ * Put a partition back to the store. Use this method to be put a partition
+ * back after it has been retrieved through {@link #getPartition(Integer)}.
+ *
+ * @param partition Partition
+ */
+ public abstract void putPartition(Partition<I, V, E, M> partition);
+
+ /**
* Remove a partition and return it.
*
* @param partitionId Partition id
@@ -99,18 +106,7 @@ public abstract class PartitionStore<I extends WritableComparable,
}
/**
- * Return all the stored partitions as an Iterable. Note that this may force
- * out-of-core partitions to be loaded into memory if using out-of-core.
- *
- * @return The partition objects
+ * Called at the end of the computation.
*/
- public Iterable<Partition<I, V, E, M>> getPartitions() {
- return Iterables.transform(getPartitionIds(),
- new Function<Integer, Partition<I, V, E, M>>() {
- @Override
- public Partition<I, V, E, M> apply(Integer partitionId) {
- return getPartition(partitionId);
- }
- });
- }
+ public void shutdown() { }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
index 7bd0bb1..74cc3a7 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionStore.java
@@ -101,5 +101,6 @@ public class SimplePartitionStore<I extends WritableComparable,
return partitions.size();
}
-
+ @Override
+ public void putPartition(Partition<I, V, E, M> partition) { }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index f542344..a48c5ea 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -546,8 +546,9 @@ else[HADOOP_NON_SECURE]*/
// if necessary
List<PartitionStats> partitionStatsList =
new ArrayList<PartitionStats>();
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E, M> partition =
+ getPartitionStore().getPartition(partitionId);
PartitionStats partitionStats =
new PartitionStats(partition.getId(),
partition.getVertexCount(),
@@ -555,6 +556,7 @@ else[HADOOP_NON_SECURE]*/
partition.getEdgeCount(),
0);
partitionStatsList.add(partitionStats);
+ getPartitionStore().putPartition(partition);
}
workerGraphPartitioner.finalizePartitionStats(
partitionStatsList, getPartitionStore());
@@ -894,8 +896,9 @@ else[HADOOP_NON_SECURE]*/
long nextPrintMsecs = System.currentTimeMillis() + 15000;
int partitionIndex = 0;
int numPartitions = getPartitionStore().getNumPartitions();
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E, M> partition =
+ getPartitionStore().getPartition(partitionId);
for (Vertex<I, V, E, M> vertex : partition) {
getContext().progress();
vertexWriter.writeVertex(vertex);
@@ -914,6 +917,7 @@ else[HADOOP_NON_SECURE]*/
nextPrintVertices = verticesWritten + 250000;
}
}
+ getPartitionStore().putPartition(partition);
getContext().progress();
++partitionIndex;
}
@@ -928,6 +932,7 @@ else[HADOOP_NON_SECURE]*/
workerClient.closeConnections();
setCachedSuperstep(getSuperstep() - 1);
saveVertices(finishedSuperstepStats.getLocalVertexCount());
+ getPartitionStore().shutdown();
// All worker processes should denote they are done by adding special
// znode. Once the number of znodes equals the number of partitions
// for workers and masters, the master will clean up the ZooKeeper
@@ -1017,8 +1022,9 @@ else[HADOOP_NON_SECURE]*/
getFs().create(verticesFilePath);
ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
- for (Partition<I, V, E, M> partition :
- getPartitionStore().getPartitions()) {
+ for (Integer partitionId : getPartitionStore().getPartitionIds()) {
+ Partition<I, V, E, M> partition =
+ getPartitionStore().getPartition(partitionId);
long startPos = verticesOutputStream.getPos();
partition.write(verticesOutputStream);
// write messages
@@ -1037,6 +1043,7 @@ else[HADOOP_NON_SECURE]*/
(verticesOutputStream.getPos() - startPos) +
", partition = " + partition.toString());
}
+ getPartitionStore().putPartition(partition);
getContext().progress();
}
// Metadata is buffered and written at the end since it's small and
@@ -1388,11 +1395,6 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public Partition<I, V, E, M> getPartition(I vertexId) {
- return getPartitionStore().getPartition(getPartitionId(vertexId));
- }
-
- @Override
public Integer getPartitionId(I vertexId) {
PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
return partitionOwner.getPartitionId();
@@ -1404,17 +1406,6 @@ else[HADOOP_NON_SECURE]*/
}
@Override
- public Vertex<I, V, E, M> getVertex(I vertexId) {
- PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
- if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
- return getPartitionStore().getPartition(
- partitionOwner.getPartitionId()).getVertex(vertexId);
- } else {
- return null;
- }
- }
-
- @Override
public ServerData<I, V, E, M> getServerData() {
return workerServer.getServerData();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
index 7187928..d779fe4 100644
--- a/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
+++ b/giraph-core/src/test/java/org/apache/giraph/comm/RequestTest.java
@@ -137,12 +137,15 @@ public class RequestTest {
serverData.getPartitionStore();
assertTrue(partitionStore.hasPartition(partitionId));
int total = 0;
+ Partition<IntWritable, IntWritable, IntWritable, IntWritable> partition2 =
+ partitionStore.getPartition(partitionId);
for (Vertex<IntWritable, IntWritable,
- IntWritable, IntWritable> vertex :
- partitionStore.getPartition(partitionId)) {
+ IntWritable, IntWritable> vertex : partition2) {
total += vertex.getId().get();
}
+ partitionStore.putPartition(partition2);
assertEquals(total, 45);
+ partitionStore.shutdown();
}
@Test
http://git-wip-us.apache.org/repos/asf/giraph/blob/62c12fa0/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index b02ed3a..b4cddf6 100644
--- a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -34,6 +34,8 @@ import org.junit.Test;
import com.google.common.collect.Iterables;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -80,6 +82,7 @@ public class TestPartitionStores {
partitionStore = new SimplePartitionStore<IntWritable, IntWritable,
NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
+ partitionStore.shutdown();
}
@Test
@@ -137,11 +140,13 @@ public class TestPartitionStores {
partitionStore = new DiskBackedPartitionStore<IntWritable,
IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
+ partitionStore.shutdown();
conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, 2);
partitionStore = new DiskBackedPartitionStore<IntWritable,
IntWritable, NullWritable, IntWritable>(conf, context);
testReadWrite(partitionStore, conf);
+ partitionStore.shutdown();
}
/**
@@ -185,16 +190,26 @@ public class TestPartitionStores {
Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition1 =
partitionStore.getPartition(1);
+ partitionStore.putPartition(partition1);
Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition2 =
partitionStore.getPartition(2);
+ partitionStore.putPartition(partition2);
Partition<IntWritable, IntWritable, NullWritable,
IntWritable> partition3 = partitionStore.removePartition(3);
Partition<IntWritable, IntWritable, NullWritable, IntWritable> partition4 =
partitionStore.getPartition(4);
+ partitionStore.putPartition(partition4);
assertEquals(3, partitionStore.getNumPartitions());
assertEquals(3, Iterables.size(partitionStore.getPartitionIds()));
- assertEquals(3, Iterables.size(partitionStore.getPartitions()));
+ int partitionsNumber = 0;
+ for (Integer partitionId : partitionStore.getPartitionIds()) {
+ Partition<IntWritable, IntWritable, NullWritable, IntWritable> p =
+ partitionStore.getPartition(partitionId);
+ partitionStore.putPartition(p);
+ partitionsNumber++;
+ }
+ assertEquals(3, partitionsNumber);
assertTrue(partitionStore.hasPartition(1));
assertTrue(partitionStore.hasPartition(2));
assertFalse(partitionStore.hasPartition(3));