You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2013/02/09 01:11:22 UTC
git commit: GIRAPH-504: Create PartitionContext (majakabiljo)
Updated Branches:
refs/heads/trunk 86c2f657f -> a6cb05bcb
GIRAPH-504: Create PartitionContext (majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a6cb05bc
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a6cb05bc
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a6cb05bc
Branch: refs/heads/trunk
Commit: a6cb05bcb4f0fcbf7477297f15237d3536b6d658
Parents: 86c2f65
Author: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Authored: Fri Feb 8 16:07:10 2013 -0800
Committer: Maja Kabiljo <ma...@maja-mbp.thefacebook.com>
Committed: Fri Feb 8 16:09:03 2013 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../java/org/apache/giraph/conf/GiraphClasses.java | 36 +++++
.../apache/giraph/conf/GiraphConfiguration.java | 13 ++
.../org/apache/giraph/conf/GiraphConstants.java | 2 +
.../conf/ImmutableClassesGiraphConfiguration.java | 19 +++
.../examples/PartitionContextTestVertex.java | 115 +++++++++++++++
.../org/apache/giraph/graph/ComputeCallable.java | 15 ++
.../java/org/apache/giraph/graph/GraphState.java | 11 ++
.../apache/giraph/partition/BasicPartition.java | 106 +++++++++++++
.../giraph/partition/ByteArrayPartition.java | 64 ++-------
.../giraph/partition/DefaultPartitionContext.java | 34 +++++
.../org/apache/giraph/partition/Partition.java | 12 ++
.../apache/giraph/partition/PartitionContext.java | 45 ++++++
.../apache/giraph/partition/SimplePartition.java | 56 +------
.../main/java/org/apache/giraph/vertex/Vertex.java | 10 ++
.../src/test/java/org/apache/giraph/BspCase.java | 3 +
.../org/apache/giraph/TestPartitionContext.java | 72 +++++++++
17 files changed, 517 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 3524ae3..c060209 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 0.2.0 - unreleased
+ GIRAPH-504: Create PartitionContext (majakabiljo)
+
GIRAPH-499: Giraph should not reserve minimum reduce slot memory 1024 since we never use it (ereisman)
GIRAPH-508: Increase the limit on the number of partitions (majakabiljo)
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
index d2641f1..5c2a01a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphClasses.java
@@ -21,6 +21,8 @@ import org.apache.giraph.aggregators.AggregatorWriter;
import org.apache.giraph.combiner.Combiner;
import org.apache.giraph.master.DefaultMasterCompute;
import org.apache.giraph.graph.DefaultVertexResolver;
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.worker.DefaultWorkerContext;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.master.MasterCompute;
@@ -83,6 +85,8 @@ public class GiraphClasses<I extends WritableComparable,
/** Vertex resolver class - cached for fast access */
protected Class<? extends VertexResolver<I, V, E, M>> vertexResolverClass;
+ /** Partition context class - cached for fast access */
+ protected Class<? extends PartitionContext> partitionContextClass;
/** Worker context class - cached for fast access */
protected Class<? extends WorkerContext> workerContextClass;
/** Master compute class - cached for fast access */
@@ -145,6 +149,8 @@ public class GiraphClasses<I extends WritableComparable,
vertexResolverClass = (Class<? extends VertexResolver<I, V, E, M>>)
conf.getClass(VERTEX_RESOLVER_CLASS,
DefaultVertexResolver.class, VertexResolver.class);
+ partitionContextClass = conf.getClass(PARTITION_CONTEXT_CLASS,
+ DefaultPartitionContext.class, PartitionContext.class);
workerContextClass = conf.getClass(WORKER_CONTEXT_CLASS,
DefaultWorkerContext.class, WorkerContext.class);
masterComputeClass = conf.getClass(MASTER_COMPUTE_CLASS,
@@ -329,6 +335,24 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Check if PartitionContext is set
+ *
+ * @return true if PartitionContext is set
+ */
+ public boolean hasPartitionContextClass() {
+ return partitionContextClass != null;
+ }
+
+ /**
+ * Get PartitionContext used
+ *
+ * @return PartitionContext
+ */
+ public Class<? extends PartitionContext> getPartitionContextClass() {
+ return partitionContextClass;
+ }
+
+ /**
* Check if WorkerContext is set
*
* @return true if WorkerContext is set
@@ -523,6 +547,18 @@ public class GiraphClasses<I extends WritableComparable,
}
/**
+ * Set PartitionContext used
+ *
+ * @param partitionContextClass PartitionContext class to set
+ * @return this
+ */
+ public GiraphClasses setPartitionContextClass(
+ Class<? extends PartitionContext> partitionContextClass) {
+ this.partitionContextClass = partitionContextClass;
+ return this;
+ }
+
+ /**
* Set WorkerContext used
*
* @param workerContextClass WorkerContext class to set
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
index 7e48103..dc5c84f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.giraph.master.MasterCompute;
import org.apache.giraph.master.MasterObserver;
import org.apache.giraph.partition.GraphPartitionerFactory;
import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
@@ -276,6 +277,18 @@ public class GiraphConfiguration extends Configuration
}
/**
+ * Set the partition context class (optional)
+ *
+ * @param partitionContextClass Determines what code is executed for each
+ * partition before and after each superstep
+ */
+ public final void setPartitionContextClass(
+ Class<? extends PartitionContext> partitionContextClass) {
+ setClass(PARTITION_CONTEXT_CLASS, partitionContextClass,
+ PartitionContext.class);
+ }
+
+ /**
* Set the worker context class (optional)
*
* @param workerContextClass Determines what code is executed on a each
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index fb4e8a3..e3d8ff3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -65,6 +65,8 @@ public interface GiraphConstants {
String EDGE_VALUE_CLASS = "giraph.edgeValueClass";
/** Message value class */
String MESSAGE_VALUE_CLASS = "giraph.messageValueClass";
+ /** Partition context class */
+ String PARTITION_CONTEXT_CLASS = "giraph.partitionContextClass";
/** Worker context class */
String WORKER_CONTEXT_CLASS = "giraph.workerContextClass";
/** AggregatorWriter class - optional */
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
index 30a7da7..3e158af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/ImmutableClassesGiraphConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.giraph.job.GiraphJobObserver;
import org.apache.giraph.io.EdgeInputFormat;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.master.MasterCompute;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.io.VertexInputFormat;
import org.apache.giraph.io.VertexOutputFormat;
@@ -287,6 +288,24 @@ public class ImmutableClassesGiraphConfiguration<I extends WritableComparable,
}
/**
+ * Get the user's subclassed PartitionContext.
+ *
+ * @return User's partition context class
+ */
+ public Class<? extends PartitionContext> getPartitionContextClass() {
+ return classes.getPartitionContextClass();
+ }
+
+ /**
+ * Create a user partition context
+ *
+ * @return Instantiated user partition context
+ */
+ public PartitionContext createPartitionContext() {
+ return ReflectionUtils.newInstance(getPartitionContextClass(), this);
+ }
+
+ /**
* Get the user's subclassed WorkerContext.
*
* @return User's worker context class
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
new file mode 100644
index 0000000..f86c323
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/examples/PartitionContextTestVertex.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.examples;
+
+import org.apache.giraph.partition.DefaultPartitionContext;
+import org.apache.giraph.vertex.EdgeListVertex;
+import org.apache.giraph.worker.DefaultWorkerContext;
+import org.apache.giraph.worker.WorkerContext;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import java.io.IOException;
+
+/**
+ * Vertex to test the functionality of PartitionContext
+ */
+public class PartitionContextTestVertex extends
+ EdgeListVertex<LongWritable, DoubleWritable, FloatWritable,
+ DoubleWritable> {
+ /** How many compute threads to use in the test */
+ public static final int NUM_COMPUTE_THREADS = 10;
+ /** How many vertices to create for the test */
+ public static final int NUM_VERTICES = 100;
+ /** How many partitions to have */
+ public static final int NUM_PARTITIONS = 25;
+
+ @Override
+ public void compute(Iterable<DoubleWritable> messages) throws IOException {
+ TestPartitionContextPartitionContext partitionContext =
+ (TestPartitionContextPartitionContext) getPartitionContext();
+ partitionContext.counter++;
+ if (getSuperstep() > 5) {
+ voteToHalt();
+ }
+ }
+
+ /**
+ * PartitionContext for TestPartitionContext
+ */
+ public static class TestPartitionContextPartitionContext extends
+ DefaultPartitionContext {
+ /**
+ * The counter should hold the number of vertices in this partition,
+ * plus the current superstep
+ */
+ private long counter;
+
+ @Override
+ public void preSuperstep(WorkerContext workerContext) {
+ counter =
+ ((TestPartitionContextWorkerContext) workerContext).superstepCounter;
+ }
+
+ @Override
+ public void postSuperstep(WorkerContext workerContext) {
+ ((TestPartitionContextWorkerContext) workerContext).totalCounter +=
+ counter;
+ }
+ }
+
+ /**
+ * WorkerContext for TestPartitionContext
+ */
+ public static class TestPartitionContextWorkerContext extends
+ DefaultWorkerContext {
+ /** Current superstep */
+ private long superstepCounter;
+ /**
+ * This counter should hold the sum of PartitionContext's counters
+ */
+ private long totalCounter;
+
+ @Override
+ public void preSuperstep() {
+ superstepCounter = getSuperstep();
+ totalCounter = 0;
+ }
+
+ @Override
+ public void postSuperstep() {
+ assertEquals(totalCounter,
+ NUM_PARTITIONS * superstepCounter + getTotalNumVertices());
+ }
+ }
+
+ /**
+ * Throws exception if values are not equal.
+ *
+ * @param expected Expected value
+ * @param actual Actual value
+ */
+ private static void assertEquals(long expected, long actual) {
+ if (expected != actual) {
+ throw new RuntimeException("expected: " + expected +
+ ", actual: " + actual);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
index 94ed6d9..c7aff7c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -27,6 +27,7 @@ import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.TimerDesc;
import org.apache.giraph.partition.Partition;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
@@ -34,6 +35,7 @@ import org.apache.giraph.time.Times;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.TimedLogger;
import org.apache.giraph.vertex.Vertex;
+import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerThreadAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -197,6 +199,15 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
new PartitionStats(partition.getId(), 0, 0, 0, 0);
// Make sure this is thread-safe across runs
synchronized (partition) {
+ // Prepare Partition context
+ WorkerContext workerContext =
+ graphState.getGraphTaskManager().getWorkerContext();
+ PartitionContext partitionContext = partition.getPartitionContext();
+ synchronized (workerContext) {
+ partitionContext.preSuperstep(workerContext);
+ }
+ graphState.setPartitionContext(partition.getPartitionContext());
+
for (Vertex<I, V, E, M> vertex : partition) {
// Make sure every vertex has this thread's
// graphState before computing
@@ -229,6 +240,10 @@ public class ComputeCallable<I extends WritableComparable, V extends Writable,
}
messageStore.clearPartition(partition.getId());
+
+ synchronized (workerContext) {
+ partitionContext.postSuperstep(workerContext);
+ }
}
return partitionStats;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
index 9cdec7c..93ad5df 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphState.java
@@ -18,6 +18,7 @@
package org.apache.giraph.graph;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -49,6 +50,8 @@ E extends Writable, M extends Writable> {
workerClientRequestProcessor;
/** Worker aggregator usage */
private final WorkerAggregatorUsage workerAggregatorUsage;
+ /** Partition context */
+ private PartitionContext partitionContext;
/**
* Constructor
@@ -106,6 +109,14 @@ E extends Writable, M extends Writable> {
return workerAggregatorUsage;
}
+ public void setPartitionContext(PartitionContext partitionContext) {
+ this.partitionContext = partitionContext;
+ }
+
+ public PartitionContext getPartitionContext() {
+ return partitionContext;
+ }
+
@Override
public String toString() {
return "(superstep=" + superstep + ",numVertices=" + numVertices + "," +
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
new file mode 100644
index 0000000..dc9192e
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/BasicPartition.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.Progressable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Basic partition class for other partitions to extend. Holds partition id,
+ * configuration, progressable and partition context
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public abstract class BasicPartition<I extends WritableComparable,
+ V extends Writable, E extends Writable, M extends Writable>
+ implements Partition<I, V, E, M> {
+ /** Configuration from the worker */
+ private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
+ /** Partition id */
+ private int id;
+ /** Context used to report progress */
+ private Progressable progressable;
+ /** Partition context */
+ private PartitionContext partitionContext;
+
+ @Override
+ public void initialize(int partitionId, Progressable progressable) {
+ setId(partitionId);
+ setProgressable(progressable);
+ partitionContext = conf.createPartitionContext();
+ }
+
+ @Override
+ public void setConf(
+ ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
+ conf = configuration;
+ }
+
+ @Override
+ public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
+ return conf;
+ }
+
+ @Override
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public void setId(int id) {
+ this.id = id;
+ }
+
+ @Override
+ public PartitionContext getPartitionContext() {
+ return partitionContext;
+ }
+
+ @Override
+ public void progress() {
+ if (progressable != null) {
+ progressable.progress();
+ }
+ }
+
+ @Override
+ public void setProgressable(Progressable progressable) {
+ this.progressable = progressable;
+ }
+
+ @Override
+ public void write(DataOutput output) throws IOException {
+ output.writeInt(id);
+ }
+
+ @Override
+ public void readFields(DataInput input) throws IOException {
+ id = input.readInt();
+ partitionContext = conf.createPartitionContext();
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
index d34af11..1298918 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/ByteArrayPartition.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.vertex.Vertex;
import org.apache.giraph.utils.UnsafeByteArrayInputStream;
import org.apache.giraph.utils.WritableUtils;
@@ -48,21 +47,15 @@ import org.apache.log4j.Logger;
*/
public class ByteArrayPartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Partition<I, V, E, M> {
+ extends BasicPartition<I, V, E, M> {
/** Class logger */
private static final Logger LOG = Logger.getLogger(ByteArrayPartition.class);
- /** Configuration from the worker */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Partition id */
- private int id;
/**
* Vertex map for this range (keyed by index). Note that the byte[] is a
* serialized vertex with the first four bytes as the length of the vertex
* to read.
*/
private ConcurrentMap<I, byte[]> vertexMap;
- /** Context used to report progress */
- private Progressable progressable;
/** Representative vertex */
private Vertex<I, V, E, M> representativeVertex;
/** Use unsafe serialization */
@@ -75,12 +68,11 @@ public class ByteArrayPartition<I extends WritableComparable,
@Override
public void initialize(int partitionId, Progressable progressable) {
- setId(partitionId);
- setProgressable(progressable);
+ super.initialize(partitionId, progressable);
vertexMap = new MapMaker().concurrencyLevel(
- conf.getNettyServerExecutionConcurrency()).makeMap();
- representativeVertex = conf.createVertex();
- useUnsafeSerialization = conf.useUnsafeSerialization();
+ getConf().getNettyServerExecutionConcurrency()).makeMap();
+ representativeVertex = getConf().createVertex();
+ useUnsafeSerialization = getConf().useUnsafeSerialization();
}
@Override
@@ -152,21 +144,6 @@ public class ByteArrayPartition<I extends WritableComparable,
}
@Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
- @Override
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- @Override
public void saveVertex(Vertex<I, V, E, M> vertex) {
// Reuse the old buffer whenever possible
byte[] oldVertexData = vertexMap.get(vertex.getId());
@@ -183,12 +160,10 @@ public class ByteArrayPartition<I extends WritableComparable,
@Override
public void write(DataOutput output) throws IOException {
- output.writeInt(id);
+ super.write(output);
output.writeInt(vertexMap.size());
for (Map.Entry<I, byte[]> entry : vertexMap.entrySet()) {
- if (progressable != null) {
- progressable.progress();
- }
+ progress();
entry.getKey().write(output);
// Note here that we are writing the size of the vertex data first
// as it is encoded in the first four bytes of the byte[]
@@ -207,18 +182,16 @@ public class ByteArrayPartition<I extends WritableComparable,
@Override
public void readFields(DataInput input) throws IOException {
- id = input.readInt();
+ super.readFields(input);
int size = input.readInt();
vertexMap = new MapMaker().concurrencyLevel(
- conf.getNettyServerExecutionConcurrency()).initialCapacity(
+ getConf().getNettyServerExecutionConcurrency()).initialCapacity(
size).makeMap();
- representativeVertex = conf.createVertex();
- useUnsafeSerialization = conf.useUnsafeSerialization();
+ representativeVertex = getConf().createVertex();
+ useUnsafeSerialization = getConf().useUnsafeSerialization();
for (int i = 0; i < size; ++i) {
- if (progressable != null) {
- progressable.progress();
- }
- I vertexId = conf.createVertexId();
+ progress();
+ I vertexId = getConf().createVertexId();
vertexId.readFields(input);
int vertexDataSize = input.readInt();
byte[] vertexData = new byte[vertexDataSize];
@@ -231,17 +204,6 @@ public class ByteArrayPartition<I extends WritableComparable,
}
@Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
- conf = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
public Iterator<Vertex<I, V, E, M>> iterator() {
return new RepresentativeVertexIterator();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
new file mode 100644
index 0000000..c22c802
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/DefaultPartitionContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerContext;
+
+/**
+ * Empty implementation of {@link PartitionContext}
+ */
+public class DefaultPartitionContext implements PartitionContext {
+ @Override
+ public void preSuperstep(WorkerContext workerContext) {
+ }
+
+ @Override
+ public void postSuperstep(WorkerContext workerContext) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
index 55ce8c0..657c054 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/Partition.java
@@ -107,6 +107,11 @@ public interface Partition<I extends WritableComparable,
void setId(int id);
/**
+ * Report progress.
+ */
+ void progress();
+
+ /**
* Set the context.
*
* @param progressable Progressable
@@ -119,4 +124,11 @@ public interface Partition<I extends WritableComparable,
* @param vertex Vertex to save
*/
void saveVertex(Vertex<I, V, E, M> vertex);
+
+ /**
+ * Get partition context
+ *
+ * @return Partition context
+ */
+ PartitionContext getPartitionContext();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
new file mode 100644
index 0000000..412f6e3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionContext.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.worker.WorkerContext;
+
+/**
+ * PartitionContext allows for the execution of user code
+ * on a per-partition basis. There's one PartitionContext per partition.
+ */
+public interface PartitionContext {
+ /**
+ * Execute user code.
+ * This method is executed once for each partition before computation for
+ * that partition starts.
+ *
+ * @param workerContext Worker context
+ */
+ void preSuperstep(WorkerContext workerContext);
+
+ /**
+ * Execute user code.
+ * This method is executed once on for each partition after computation in
+ * current superstep for that partition ends.
+ *
+ * @param workerContext Worker context
+ */
+ void postSuperstep(WorkerContext workerContext);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
index 479011f..cbf6bc3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartition.java
@@ -19,7 +19,6 @@
package org.apache.giraph.partition;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.vertex.Vertex;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -46,15 +45,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
@SuppressWarnings("rawtypes")
public class SimplePartition<I extends WritableComparable,
V extends Writable, E extends Writable, M extends Writable>
- implements Partition<I, V, E, M> {
- /** Configuration from the worker */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Partition id */
- private int id;
+ extends BasicPartition<I, V, E, M> {
/** Vertex map for this range (keyed by index) */
private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
- /** Context used to report progress */
- private Progressable progressable;
/**
* Constructor for reflection.
@@ -63,9 +56,8 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void initialize(int partitionId, Progressable progressable) {
- setId(partitionId);
- setProgressable(progressable);
- if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+ super.initialize(partitionId, progressable);
+ if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
@@ -110,21 +102,6 @@ public class SimplePartition<I extends WritableComparable,
}
@Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
- @Override
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- @Override
public void saveVertex(Vertex<I, V, E, M> vertex) {
// No-op, vertices are stored as Java objects in this partition
}
@@ -136,19 +113,17 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void readFields(DataInput input) throws IOException {
- if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
+ super.readFields(input);
+ if (getConf().getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
} else {
vertexMap = Maps.newConcurrentMap();
}
- id = input.readInt();
int vertices = input.readInt();
for (int i = 0; i < vertices; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- if (progressable != null) {
- progressable.progress();
- }
+ Vertex<I, V, E, M> vertex = getConf().createVertex();
+ progress();
vertex.readFields(input);
if (vertexMap.put(vertex.getId(), vertex) != null) {
throw new IllegalStateException(
@@ -160,28 +135,15 @@ public class SimplePartition<I extends WritableComparable,
@Override
public void write(DataOutput output) throws IOException {
- output.writeInt(id);
+ super.write(output);
output.writeInt(vertexMap.size());
for (Vertex vertex : vertexMap.values()) {
- if (progressable != null) {
- progressable.progress();
- }
+ progress();
vertex.write(output);
}
}
@Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
- this.conf = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
public Iterator<Vertex<I, V, E, M>> iterator() {
return vertexMap.values().iterator();
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
index 974232e..db6dca3 100644
--- a/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
+++ b/giraph-core/src/main/java/org/apache/giraph/vertex/Vertex.java
@@ -23,6 +23,7 @@ import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.DefaultEdge;
import org.apache.giraph.graph.Edge;
import org.apache.giraph.graph.GraphState;
+import org.apache.giraph.partition.PartitionContext;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.worker.WorkerAggregatorUsage;
import org.apache.giraph.worker.WorkerContext;
@@ -328,6 +329,15 @@ public abstract class Vertex<I extends WritableComparable,
}
/**
+ * Get the partition context
+ *
+ * @return Partition context
+ */
+ public PartitionContext getPartitionContext() {
+ return getGraphState().getPartitionContext();
+ }
+
+ /**
* Get the worker context
*
* @return WorkerContext context
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/BspCase.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/BspCase.java b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
index 6aab533..0fe9fda 100644
--- a/giraph-core/src/test/java/org/apache/giraph/BspCase.java
+++ b/giraph-core/src/test/java/org/apache/giraph/BspCase.java
@@ -181,6 +181,9 @@ public class BspCase implements Watcher {
if (classes.hasVertexOutputFormat()) {
conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass());
}
+ if (classes.hasPartitionContextClass()) {
+ conf.setPartitionContextClass(classes.getPartitionContextClass());
+ }
if (classes.hasWorkerContextClass()) {
conf.setWorkerContextClass(classes.getWorkerContextClass());
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/a6cb05bc/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
new file mode 100644
index 0000000..cdf1f65
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/TestPartitionContext.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph;
+
+import org.apache.giraph.conf.GiraphClasses;
+import org.apache.giraph.examples.PartitionContextTestVertex;
+import org.apache.giraph.examples.GeneratedVertexReader;
+import org.apache.giraph.examples.SimplePageRankVertex;
+import org.apache.giraph.job.GiraphJob;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+public class TestPartitionContext extends BspCase {
+ public TestPartitionContext() {
+ super(TestPartitionContext.class.getName());
+ }
+
+ @Test
+ public void testPartitionContext() throws IOException,
+ ClassNotFoundException, InterruptedException {
+ if (runningInDistributedMode()) {
+ System.out.println(
+ "testComputeContext: Ignore this test in distributed mode.");
+ return;
+ }
+ GiraphClasses<LongWritable, DoubleWritable, FloatWritable, DoubleWritable>
+ classes = new GiraphClasses();
+ classes.setVertexClass(PartitionContextTestVertex.class);
+ classes.setVertexInputFormatClass(
+ SimplePageRankVertex.SimplePageRankVertexInputFormat.class);
+ classes.setWorkerContextClass(
+ PartitionContextTestVertex.TestPartitionContextWorkerContext.class);
+ classes.setPartitionContextClass(
+ PartitionContextTestVertex.TestPartitionContextPartitionContext.class);
+ GiraphJob job = prepareJob(getCallingMethodName(), classes);
+ // Use multithreading
+ job.getConfiguration().setNumComputeThreads(
+ PartitionContextTestVertex.NUM_COMPUTE_THREADS);
+ // Increase the number of vertices
+ job.getConfiguration().setInt(
+ GeneratedVertexReader.READER_VERTICES,
+ PartitionContextTestVertex.NUM_VERTICES);
+ // Increase the number of partitions
+ job.getConfiguration().setInt(
+ HashMasterPartitioner.USER_PARTITION_COUNT,
+ PartitionContextTestVertex.NUM_PARTITIONS);
+ assertTrue(job.run(true));
+ }
+}