You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ik...@apache.org on 2016/01/27 20:44:17 UTC
[1/2] git commit: updated refs/heads/trunk to ca36f1d
Repository: giraph
Updated Branches:
refs/heads/trunk 1e802da3a -> ca36f1d49
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
deleted file mode 100644
index 0ee8d92..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.collect.Lists;
-import org.apache.log4j.Logger;
-
-/**
- * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
- * user function - getPartitionIndex.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements WorkerGraphPartitioner<I, V, E> {
- /** Logger instance */
- private static final Logger LOG = Logger.getLogger(
- SimpleWorkerPartitioner.class);
- /** List of {@link PartitionOwner}s for this worker. */
- private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
- /** List of available workers */
- private Set<WorkerInfo> availableWorkers = new HashSet<>();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new BasicPartitionOwner();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- return partitionOwnerList.get(
- getPartitionIndex(vertexId, partitionOwnerList.size(),
- availableWorkers.size()));
- }
-
- @Override
- public Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- PartitionStore<I, V, E> partitionStore) {
- // No modification necessary
- return workerPartitionStats;
- }
-
- @Override
- public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners) {
- PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
- partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
- extractAvailableWorkers();
- return exchange;
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
- }
-
- /**
- * Update availableWorkers
- */
- public void extractAvailableWorkers() {
- availableWorkers.clear();
- for (PartitionOwner partitionOwner : partitionOwnerList) {
- availableWorkers.add(partitionOwner.getWorkerInfo());
- }
- LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
- " workers are available");
- }
-
- /**
- * Calculates in which partition current vertex belongs to,
- * from interval [0, partitionCount).
- *
- * @param id Vertex id
- * @param partitionCount Number of partitions
- * @param workerCount Number of active workers
- * @return partition
- */
- protected abstract int getPartitionIndex(I id, int partitionCount,
- int workerCount);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
new file mode 100644
index 0000000..2087181
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/WorkerGraphPartitionerImpl.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+import org.apache.log4j.Logger;
+
+/**
+ * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
+ * user function - getPartitionIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class WorkerGraphPartitionerImpl<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements WorkerGraphPartitioner<I, V, E> {
+ /** Logger instance */
+ private static final Logger LOG = Logger.getLogger(
+ WorkerGraphPartitionerImpl.class);
+ /** List of {@link PartitionOwner}s for this worker. */
+ private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+ /** List of available workers */
+ private Set<WorkerInfo> availableWorkers = new HashSet<>();
+
+ @Override
+ public PartitionOwner createPartitionOwner() {
+ return new BasicPartitionOwner();
+ }
+
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ return partitionOwnerList.get(
+ getPartitionIndex(vertexId, partitionOwnerList.size(),
+ availableWorkers.size()));
+ }
+
+ @Override
+ public Collection<PartitionStats> finalizePartitionStats(
+ Collection<PartitionStats> workerPartitionStats,
+ PartitionStore<I, V, E> partitionStore) {
+ // No modification necessary
+ return workerPartitionStats;
+ }
+
+ @Override
+ public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+ PartitionExchange exchange = PartitionBalancer.updatePartitionOwners(
+ partitionOwnerList, myWorkerInfo, masterSetPartitionOwners);
+ extractAvailableWorkers();
+ return exchange;
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return partitionOwnerList;
+ }
+
+ /**
+ * Update availableWorkers
+ */
+ public void extractAvailableWorkers() {
+ availableWorkers.clear();
+ for (PartitionOwner partitionOwner : partitionOwnerList) {
+ availableWorkers.add(partitionOwner.getWorkerInfo());
+ }
+ LOG.info("After updating partitionOwnerList " + availableWorkers.size() +
+ " workers are available");
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of active workers
+ * @return partition
+ */
+ protected abstract int getPartitionIndex(I id, int partitionCount,
+ int workerCount);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
index 478a33b..49602a1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
+++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java
@@ -41,6 +41,7 @@ import org.apache.giraph.writable.kryo.serializers.ReusableFieldSerializer;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.log4j.Logger;
import org.objenesis.strategy.StdInstantiatorStrategy;
import com.esotericsoftware.kryo.Kryo;
@@ -106,6 +107,9 @@ public class HadoopKryo extends Kryo {
Random.class,
"it should be rarely serialized, since it would create same stream " +
"of numbers everywhere, use TransientRandom instead");
+ NON_SERIALIZABLE.put(
+ Logger.class,
+ "Logger must be a static field");
}
// Use chunked streams, so within same stream we can use both kryo and
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
index bf87491..a3c95bb 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestGraphPartitioner.java
@@ -18,13 +18,17 @@
package org.apache.giraph;
+import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.examples.GeneratedVertexReader;
import org.apache.giraph.examples.SimpleCheckpoint;
import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexInputFormat;
import org.apache.giraph.examples.SimpleSuperstepComputation.SimpleSuperstepVertexOutputFormat;
-import org.apache.giraph.integration.SuperstepHashPartitionerFactory;
import org.apache.giraph.job.GiraphJob;
import org.apache.giraph.partition.HashRangePartitionerFactory;
import org.apache.giraph.partition.PartitionBalancer;
@@ -34,12 +38,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
-import java.io.IOException;
-
-import static org.apache.giraph.examples.GeneratedVertexReader.READER_VERTICES;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
/**
* Unit test for manual checkpoint restarting
*/
@@ -108,24 +106,6 @@ public class TestGraphPartitioner extends BspCase {
assertTrue(job.run(true));
verifyOutput(hdfs, outputPath);
- outputPath = getTempPath("testSuperstepHashPartitioner");
- conf = new GiraphConfiguration();
- conf.setComputationClass(
- SimpleCheckpoint.SimpleCheckpointComputation.class);
- conf.setWorkerContextClass(
- SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
- conf.setMasterComputeClass(
- SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
- conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- job = prepareJob("testSuperstepHashPartitioner", conf, outputPath);
-
- job.getConfiguration().setGraphPartitionerFactoryClass(
- SuperstepHashPartitionerFactory.class);
-
- assertTrue(job.run(true));
- verifyOutput(hdfs, outputPath);
-
job = new GiraphJob("testHashRangePartitioner");
setupConfiguration(job);
job.getConfiguration().setComputationClass(
@@ -145,24 +125,6 @@ public class TestGraphPartitioner extends BspCase {
assertTrue(job.run(true));
verifyOutput(hdfs, outputPath);
- outputPath = getTempPath("testReverseIdSuperstepHashPartitioner");
- conf = new GiraphConfiguration();
- conf.setComputationClass(
- SimpleCheckpoint.SimpleCheckpointComputation.class);
- conf.setWorkerContextClass(
- SimpleCheckpoint.SimpleCheckpointVertexWorkerContext.class);
- conf.setMasterComputeClass(
- SimpleCheckpoint.SimpleCheckpointVertexMasterCompute.class);
- conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
- conf.setVertexOutputFormatClass(SimpleSuperstepVertexOutputFormat.class);
- job = prepareJob("testReverseIdSuperstepHashPartitioner", conf,
- outputPath);
- job.getConfiguration().setGraphPartitionerFactoryClass(
- SuperstepHashPartitionerFactory.class);
- GeneratedVertexReader.REVERSE_ID_ORDER.set(job.getConfiguration(), true);
- assertTrue(job.run(true));
- verifyOutput(hdfs, outputPath);
-
job = new GiraphJob("testSimpleRangePartitioner");
setupConfiguration(job);
job.getConfiguration().setComputationClass(
[2/2] git commit: updated refs/heads/trunk to ca36f1d
Posted by ik...@apache.org.
Use Partitions in LocalBlockRunner
Summary:
Speed up LocalBlockRunner, by not operating on a TestGraph, but on vertices stored in partitions.
With it - deprecate old non-SimplePartitionerFactory way of specifying partitioning.
(and with it renamed SimplePartitionerFactory to old name GraphPartitionerFactory, and changing it to
GraphPartitionerFactoryInterface)
Test Plan:
Run unit-test for speed:
testEmptyIterationsSmallGraph
6.5 -> 6.3
testEmptyIterationsSyntheticGraphLowDegree()
42.0 -> 13.8
testEmptyIterationsSyntheticGraphHighDegree()
3.6 -> 2.0
testPageRankSyntheticGraphLowDegree()
51.0 -> 47.2
testPageRankSyntheticGraphHighDegree()
20.3 -> 17.4
Reviewers: maja.kabiljo, sergey.edunov, dionysis.logothetis
Reviewed By: dionysis.logothetis
Differential Revision: https://reviews.facebook.net/D52425
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/ca36f1d4
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/ca36f1d4
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/ca36f1d4
Branch: refs/heads/trunk
Commit: ca36f1d499a70b1aee85ac637de53f153a9f4c92
Parents: 1e802da
Author: Igor Kabiljo <ik...@fb.com>
Authored: Tue Dec 29 15:14:32 2015 -0800
Committer: Igor Kabiljo <ik...@fb.com>
Committed: Wed Jan 27 11:44:02 2016 -0800
----------------------------------------------------------------------
.../api/local/TestLocalBlockRunnerSpeed.java | 144 +++++++++++++++++++
.../giraph/block_app/framework/BlockUtils.java | 7 +
.../framework/api/local/InternalApi.java | 67 +++++++--
.../framework/api/local/LocalBlockRunner.java | 71 +++++----
.../framework/api/local/VertexSaver.java | 34 -----
.../framework/internal/BlockMasterLogic.java | 20 ++-
.../internal/BlockWorkerContextLogic.java | 6 +-
.../test_setup/graphs/SyntheticGraphInit.java | 7 -
.../MultipleSimultanousMutationsTest.java | 89 ++++++++++++
.../org/apache/giraph/conf/GiraphConstants.java | 2 +-
.../SuperstepHashPartitionerFactory.java | 125 ----------------
.../apache/giraph/integration/package-info.java | 21 ---
.../partition/GraphPartitionerFactory.java | 96 ++++++++++---
.../GraphPartitionerFactoryInterface.java | 59 ++++++++
.../giraph/partition/HashMasterPartitioner.java | 117 ---------------
.../partition/HashPartitionerFactory.java | 23 +--
.../partition/HashRangePartitionerFactory.java | 25 ++--
.../partition/HashRangeWorkerPartitioner.java | 49 -------
.../giraph/partition/HashWorkerPartitioner.java | 77 ----------
.../LongMappingStorePartitionerFactory.java | 6 +-
.../partition/MasterGraphPartitionerImpl.java | 111 ++++++++++++++
.../SimpleIntRangePartitionerFactory.java | 6 +-
.../SimpleLongRangePartitionerFactory.java | 6 +-
.../partition/SimpleMasterPartitioner.java | 112 ---------------
.../partition/SimplePartitionerFactory.java | 121 ----------------
.../partition/SimpleWorkerPartitioner.java | 109 --------------
.../partition/WorkerGraphPartitionerImpl.java | 109 ++++++++++++++
.../apache/giraph/writable/kryo/HadoopKryo.java | 4 +
.../org/apache/giraph/TestGraphPartitioner.java | 50 +------
29 files changed, 753 insertions(+), 920 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
----------------------------------------------------------------------
diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
new file mode 100644
index 0000000..44145e3
--- /dev/null
+++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/api/local/TestLocalBlockRunnerSpeed.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.block_app.framework.api.local;
+
+import org.apache.giraph.block_app.examples.pagerank.AbstractPageRankExampleBlockFactory;
+import org.apache.giraph.block_app.examples.pagerank.PageRankExampleBlockFactory;
+import org.apache.giraph.block_app.framework.BlockUtils;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.block.RepeatBlock;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.block_app.test_setup.graphs.EachVertexInit;
+import org.apache.giraph.block_app.test_setup.graphs.Small1GraphInit;
+import org.apache.giraph.block_app.test_setup.graphs.SyntheticGraphInit;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestLocalBlockRunnerSpeed {
+ public static class EmptyPiecesBlockFactory extends AbstractPageRankExampleBlockFactory {
+ @Override
+ public Block createBlock(GiraphConfiguration conf) {
+ return new RepeatBlock(NUM_ITERATIONS.get(conf), new Piece<>());
+ }
+ }
+
+ @BeforeClass
+ public static void warmup() throws Exception {
+ TestGraphUtils.runTest(
+ new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+ BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+ });
+ }
+
+ @Test
+ @Ignore("use for benchmarking")
+ public void testEmptyIterationsSmallGraph() throws Exception {
+ TestGraphUtils.runTest(
+ new Small1GraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+ BlockUtils.LOG_EXECUTION_STATUS.set(conf, false);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 10000);
+ });
+ }
+
+ @Test
+ @Ignore("use for benchmarking")
+ public void testEmptyIterationsSyntheticGraphLowDegree() throws Exception {
+ TestGraphUtils.runTest(
+ new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+
+ SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
+ SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
+ SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+ });
+ }
+
+ @Test
+ @Ignore("use for benchmarking")
+ public void testEmptyIterationsSyntheticGraphHighDegree() throws Exception {
+ TestGraphUtils.runTest(
+ new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, EmptyPiecesBlockFactory.class);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 1000);
+
+ SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
+ SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
+ SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+ });
+ }
+
+ @Test
+ @Ignore("use for benchmarking")
+ public void testPageRankSyntheticGraphLowDegree() throws Exception {
+ TestGraphUtils.runTest(
+ TestGraphUtils.chainModifiers(
+ new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);
+
+ SyntheticGraphInit.NUM_VERTICES.set(conf, 500000);
+ SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 10);
+ SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+ });
+ }
+
+ @Test
+ @Ignore("use for benchmarking")
+ public void testPageRankSyntheticGraphHighDegree() throws Exception {
+ TestGraphUtils.runTest(
+ TestGraphUtils.chainModifiers(
+ new SyntheticGraphInit<LongWritable, DoubleWritable, NullWritable>(),
+ new EachVertexInit<>((vertex) -> vertex.getValue().set(1.0))),
+ null,
+ (GiraphConfiguration conf) -> {
+ LocalBlockRunner.RUN_ALL_CHECKS.set(conf, false);
+ BlockUtils.setBlockFactoryClass(conf, PageRankExampleBlockFactory.class);
+ AbstractPageRankExampleBlockFactory.NUM_ITERATIONS.set(conf, 100);
+
+ SyntheticGraphInit.NUM_VERTICES.set(conf, 50000);
+ SyntheticGraphInit.NUM_EDGES_PER_VERTEX.set(conf, 100);
+ SyntheticGraphInit.NUM_COMMUNITIES.set(conf, 1000);
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
index e00909c..6bf6d92 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/BlockUtils.java
@@ -28,6 +28,7 @@ import org.apache.giraph.block_app.framework.api.giraph.BlockWorkerContext;
import org.apache.giraph.block_app.framework.block.Block;
import org.apache.giraph.block_app.framework.piece.AbstractPiece;
import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.ClassConfOption;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
@@ -58,6 +59,12 @@ public class BlockUtils {
Object.class, Object.class,
"block worker context value class");
+ /** Property describing whether to log execution status as application runs */
+ public static final
+ BooleanConfOption LOG_EXECUTION_STATUS = new BooleanConfOption(
+ "giraph.block_utils.log_execution_status", true,
+ "Log execution status (of which pieces are being executed, etc)");
+
private static final Logger LOG = Logger.getLogger(BlockUtils.class);
/** Dissallow constructor */
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
index 3ca8b1c..a4703b4 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java
@@ -18,7 +18,6 @@
package org.apache.giraph.block_app.framework.api.local;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -53,6 +52,8 @@ import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexMutations;
import org.apache.giraph.graph.VertexResolver;
import org.apache.giraph.master.AggregatorToGlobalCommTranslation;
+import org.apache.giraph.partition.GraphPartitionerFactory;
+import org.apache.giraph.partition.Partition;
import org.apache.giraph.reducers.ReduceOperation;
import org.apache.giraph.utils.TestGraph;
import org.apache.giraph.utils.WritableUtils;
@@ -74,7 +75,10 @@ import com.google.common.base.Preconditions;
@SuppressWarnings({ "rawtypes", "unchecked" })
class InternalApi<I extends WritableComparable, V extends Writable,
E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor {
- private final TestGraph<I, V, E> graph;
+ private final TestGraph<I, V, E> inputGraph;
+ private final List<Partition<I, V, E>> partitions;
+ private final GraphPartitionerFactory<I, V, E> partitionerFactory;
+
private final ImmutableClassesGiraphConfiguration conf;
private final boolean runAllChecks;
private final InternalAggregators globalComm;
@@ -94,8 +98,22 @@ class InternalApi<I extends WritableComparable, V extends Writable,
public InternalApi(
TestGraph<I, V, E> graph,
ImmutableClassesGiraphConfiguration conf,
+ int numPartitions,
boolean runAllChecks) {
- this.graph = graph;
+ this.inputGraph = graph;
+ this.partitions = new ArrayList<>(numPartitions);
+ for (int i = 0; i < numPartitions; i++) {
+ this.partitions.add(conf.createPartition(i, null));
+ }
+ this.partitionerFactory = conf.createGraphPartitioner();
+ Preconditions.checkNotNull(this.partitionerFactory);
+ Preconditions.checkState(this.partitions.size() == numPartitions);
+
+ for (Vertex<I, V, E> vertex : graph) {
+ getPartition(vertex.getId()).putVertex(vertex);
+ }
+ graph.clear();
+
this.conf = conf;
this.runAllChecks = runAllChecks;
this.globalComm = new InternalAggregators(runAllChecks);
@@ -362,8 +380,8 @@ class InternalApi<I extends WritableComparable, V extends Writable,
Collections.EMPTY_SET : previousMessages.targetsSet();
if (createVertexOnMsgs) {
for (I target : targets) {
- if (!graph.getVertices().containsKey(target)) {
- mutations.put(target, new VertexMutations<I, V, E>());
+ if (getPartition(target).getVertex(target) == null) {
+ mutations.putIfAbsent(target, new VertexMutations<I, V, E>());
}
}
}
@@ -371,23 +389,25 @@ class InternalApi<I extends WritableComparable, V extends Writable,
VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver();
for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) {
I vertexIndex = entry.getKey();
- Vertex<I, V, E> originalVertex = graph.getVertex(vertexIndex);
+ Vertex<I, V, E> originalVertex =
+ getPartition(vertexIndex).getVertex(vertexIndex);
VertexMutations<I, V, E> curMutations = entry.getValue();
Vertex<I, V, E> vertex = vertexResolver.resolve(
vertexIndex, originalVertex, curMutations,
targets.contains(vertexIndex));
if (vertex != null) {
- graph.addVertex(vertex);
+ getPartition(vertex.getId()).putVertex(vertex);
} else if (originalVertex != null) {
- graph.getVertices().remove(originalVertex.getId());
+ getPartition(originalVertex.getId()).removeVertex(
+ originalVertex.getId());
}
}
mutations.clear();
}
- public Collection<Vertex<I, V, E>> getAllVertices() {
- return graph.getVertices().values();
+ public List<Partition<I, V, E>> getPartitions() {
+ return partitions;
}
public InternalWorkerApi getWorkerApi() {
@@ -397,15 +417,19 @@ class InternalApi<I extends WritableComparable, V extends Writable,
@Override
public long getTotalNumEdges() {
int numEdges = 0;
- for (Vertex<I, V, E> vertex : graph.getVertices().values()) {
- numEdges += vertex.getNumEdges();
+ for (Partition<I, V, E> partition : partitions) {
+ numEdges += partition.getEdgeCount();
}
return numEdges;
}
@Override
public long getTotalNumVertices() {
- return graph.getVertices().size();
+ int numVertices = 0;
+ for (Partition<I, V, E> partition : partitions) {
+ numVertices += partition.getVertexCount();
+ }
+ return numVertices;
}
@Override
@@ -438,4 +462,21 @@ class InternalApi<I extends WritableComparable, V extends Writable,
public int getWorkerCount() {
return 1;
}
+
+ private int getPartitionId(I id) {
+ Preconditions.checkNotNull(id);
+ return partitionerFactory.getPartition(id, partitions.size(), 1);
+ }
+
+ private Partition<I, V, E> getPartition(I id) {
+ return partitions.get(getPartitionId(id));
+ }
+
+ public void postApplication() {
+ for (Partition<I, V, E> partition : partitions) {
+ for (Vertex<I, V, E> vertex : partition) {
+ inputGraph.setVertex(vertex);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
index d582cb2..90aa8a2 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java
@@ -17,10 +17,8 @@
*/
package org.apache.giraph.block_app.framework.api.local;
-import java.util.ArrayList;
-import java.util.Collection;
+import java.io.IOException;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -40,8 +38,11 @@ import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.io.SimpleVertexWriter;
+import org.apache.giraph.partition.Partition;
import org.apache.giraph.utils.InternalVertexRunner;
import org.apache.giraph.utils.TestGraph;
+import org.apache.giraph.utils.Trimmable;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.io.Writable;
@@ -101,7 +102,7 @@ public class LocalBlockRunner {
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
void runApp(TestGraph<I, V, E> graph) {
- VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+ SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
runAppWithVertexOutput(graph, noOpVertexSaver);
}
@@ -113,7 +114,7 @@ public class LocalBlockRunner {
<I extends WritableComparable, V extends Writable, E extends Writable>
void runBlock(
TestGraph<I, V, E> graph, Block block, Object executionStage) {
- VertexSaver<I, V, E> noOpVertexSaver = noOpVertexSaver();
+ SimpleVertexWriter<I, V, E> noOpVertexSaver = noOpVertexSaver();
runBlockWithVertexOutput(
block, executionStage, graph, noOpVertexSaver);
}
@@ -126,7 +127,7 @@ public class LocalBlockRunner {
public static
<I extends WritableComparable, V extends Writable, E extends Writable>
void runAppWithVertexOutput(
- TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver) {
+ TestGraph<I, V, E> graph, final SimpleVertexWriter<I, V, E> vertexSaver) {
BlockFactory<?> factory = BlockUtils.createBlockFactory(graph.getConf());
runBlockWithVertexOutput(
factory.createBlock(graph.getConf()),
@@ -142,18 +143,18 @@ public class LocalBlockRunner {
<I extends WritableComparable, V extends Writable, E extends Writable>
void runBlockWithVertexOutput(
Block block, Object executionStage, TestGraph<I, V, E> graph,
- final VertexSaver<I, V, E> vertexSaver
+ final SimpleVertexWriter<I, V, E> vertexSaver
) {
Preconditions.checkNotNull(block);
Preconditions.checkNotNull(graph);
ImmutableClassesGiraphConfiguration<I, V, E> conf = graph.getConf();
- int numWorkers = NUM_THREADS.get(conf);
+ int numPartitions = NUM_THREADS.get(conf);
boolean runAllChecks = RUN_ALL_CHECKS.get(conf);
boolean serializeMaster = SERIALIZE_MASTER.get(conf);
final boolean doOutputDuringComputation = conf.doOutputDuringComputation();
final InternalApi internalApi =
- new InternalApi(graph, conf, runAllChecks);
+ new InternalApi(graph, conf, numPartitions, runAllChecks);
final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi();
BlockUtils.checkBlockTypes(block, executionStage, conf);
@@ -170,8 +171,7 @@ public class LocalBlockRunner {
}
}));
- ExecutorService executor = Executors.newFixedThreadPool(numWorkers);
- Random rand = new Random();
+ ExecutorService executor = Executors.newFixedThreadPool(numPartitions);
if (runAllChecks) {
for (Vertex<I, V, E> vertex : graph) {
@@ -204,9 +204,15 @@ public class LocalBlockRunner {
blockMasterLogic.computeNext(superstep);
if (workerPieces == null) {
if (!conf.doOutputDuringComputation()) {
- Collection<Vertex<I, V, E>> vertices = internalApi.getAllVertices();
- for (Vertex<I, V, E> vertex : vertices) {
- vertexSaver.saveVertex(vertex);
+ List<Partition<I, V, E>> partitions = internalApi.getPartitions();
+ for (Partition<I, V, E> partition : partitions) {
+ for (Vertex<I, V, E> vertex : partition) {
+ try {
+ vertexSaver.writeVertex(vertex);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
}
int left = executor.shutdownNow().size();
@@ -214,14 +220,7 @@ public class LocalBlockRunner {
break;
} else {
internalApi.afterMasterBeforeWorker(workerPieces);
- List<List<Vertex<I, V, E>>> verticesPerWorker = new ArrayList<>();
- for (int i = 0; i < numWorkers; i++) {
- verticesPerWorker.add(new ArrayList<Vertex<I, V, E>>());
- }
- Collection<Vertex<I, V, E>> allVertices = internalApi.getAllVertices();
- for (Vertex<I, V, E> vertex : allVertices) {
- verticesPerWorker.get(rand.nextInt(numWorkers)).add(vertex);
- }
+ List<Partition<I, V, E>> partitions = internalApi.getPartitions();
workerContextLogic.preSuperstep(
internalWorkerApi,
@@ -229,10 +228,10 @@ public class LocalBlockRunner {
KryoWritableWrapper.wrapAndCopy(workerPieces), superstep,
internalApi.takeWorkerMessages());
- final CountDownLatch latch = new CountDownLatch(numWorkers);
+ final CountDownLatch latch = new CountDownLatch(numPartitions);
final AtomicReference<Throwable> exception = new AtomicReference<>();
anyVertexAlive.set(false);
- for (final List<Vertex<I, V, E>> curVertices : verticesPerWorker) {
+ for (final Partition<I, V, E> partition : partitions) {
executor.execute(new Runnable() {
@Override
public void run() {
@@ -244,16 +243,28 @@ public class LocalBlockRunner {
BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces);
localLogic.preSuperstep(internalWorkerApi, internalWorkerApi);
- for (Vertex<I, V, E> vertex : curVertices) {
+ for (Vertex<I, V, E> vertex : partition) {
Iterable messages = internalApi.takeMessages(vertex.getId());
if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
vertex.wakeUp();
}
+ // Equivalent of ComputeCallable.computePartition
if (!vertex.isHalted()) {
localLogic.compute(vertex, messages);
+
+ // Need to unwrap the mutated edges (possibly)
+ vertex.unwrapMutableEdges();
+ //Compact edges representation if possible
+ if (vertex instanceof Trimmable) {
+ ((Trimmable) vertex).trim();
+ }
+ // Write vertex to superstep output
+ // (no-op if it is not used)
if (doOutputDuringComputation) {
- vertexSaver.saveVertex(vertex);
+ vertexSaver.writeVertex(vertex);
}
+ // Need to save the vertex changes (possibly)
+ partition.saveVertex(vertex);
}
if (!vertex.isHalted()) {
@@ -295,14 +306,16 @@ public class LocalBlockRunner {
}
workerContextLogic.postApplication();
+ internalApi.postApplication();
}
private static
<I extends WritableComparable, E extends Writable, V extends Writable>
- VertexSaver<I, V, E> noOpVertexSaver() {
- return new VertexSaver<I, V, E>() {
+ SimpleVertexWriter<I, V, E> noOpVertexSaver() {
+ return new SimpleVertexWriter<I, V, E>() {
@Override
- public void saveVertex(Vertex<I, V, E> vertex) {
+ public void writeVertex(Vertex<I, V, E> vertex)
+ throws IOException, InterruptedException {
// No-op
}
};
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
deleted file mode 100644
index 0053644..0000000
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.block_app.framework.api.local;
-
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Interface to use for saving vertices
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public interface VertexSaver<I extends WritableComparable, V extends Writable,
- E extends Writable> {
- void saveVertex(Vertex<I, V, E> vertex);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
index 4892a33..a52bb77 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java
@@ -126,8 +126,12 @@ public class BlockMasterLogic<S> {
postApplication();
return null;
} else {
- LOG.info(
- "Master executing " + previousPiece + ", in superstep " + superstep);
+ boolean logExecutionStatus =
+ BlockUtils.LOG_EXECUTION_STATUS.get(masterApi.getConf());
+ if (logExecutionStatus) {
+ LOG.info("Master executing " + previousPiece +
+ ", in superstep " + superstep);
+ }
previousPiece.masterCompute(masterApi);
((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle().
returnAllWriters();
@@ -149,8 +153,10 @@ public class BlockMasterLogic<S> {
BlockCounters.setStageCounters(
"Master finished stage: ", previousPiece.getExecutionStage(),
masterApi);
- LOG.info(
- "Master passing next " + nextPiece + ", in superstep " + superstep);
+ if (logExecutionStatus) {
+ LOG.info(
+ "Master passing next " + nextPiece + ", in superstep " + superstep);
+ }
// if there is nothing more to compute, no need for additional superstep
// this can only happen if application uses no pieces.
@@ -160,8 +166,10 @@ public class BlockMasterLogic<S> {
result = null;
} else {
result = new BlockWorkerPieces<>(previousPiece, nextPiece);
- LOG.info("Master in " + superstep + " superstep passing " +
- result + " to be executed");
+ if (logExecutionStatus) {
+ LOG.info("Master in " + superstep + " superstep passing " +
+ result + " to be executed");
+ }
}
previousPiece = nextPiece;
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
index 8b8e174..ca2bb5a 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java
@@ -65,8 +65,10 @@ public class BlockWorkerContextLogic {
BlockWorkerContextSendApi sendApi,
BlockWorkerPieces workerPieces, long superstep,
List<Writable> messages) {
- LOG.info("Worker executing " + workerPieces + " in " + superstep +
- " superstep");
+ if (BlockUtils.LOG_EXECUTION_STATUS.get(receiveApi.getConf())) {
+ LOG.info("Worker executing " + workerPieces + " in " + superstep +
+ " superstep");
+ }
this.sendApi = sendApi;
this.workerPieces = workerPieces;
if (workerPieces.getReceiver() != null) {
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
index 3de158a..cd485b4 100644
--- a/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
+++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/test_setup/graphs/SyntheticGraphInit.java
@@ -58,7 +58,6 @@ public class SyntheticGraphInit<I extends WritableComparable,
this.edgeSupplier = null;
}
-
@Override
public void modifyGraph(NumericTestGraph<I, V, E> graph) {
GiraphConfiguration conf = graph.getConf();
@@ -84,11 +83,5 @@ public class SyntheticGraphInit<I extends WritableComparable,
i, j, edgeSupplier != null ? edgeSupplier.get() : null);
}
}
-
-// if (vertexModifier != null) {
-// for (int i = 0; i < numVertices; i++) {
-// vertexModifier.modifyVertexValue(i, graph.getVertex(i).getValue());
-// }
-// }
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
----------------------------------------------------------------------
diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
new file mode 100644
index 0000000..e2c316e
--- /dev/null
+++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/MultipleSimultanousMutationsTest.java
@@ -0,0 +1,89 @@
+package org.apache.giraph.block_app.framework;
+
+import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi;
+import org.apache.giraph.block_app.framework.block.Block;
+import org.apache.giraph.block_app.framework.piece.Piece;
+import org.apache.giraph.block_app.framework.piece.interfaces.VertexSender;
+import org.apache.giraph.block_app.test_setup.NumericTestGraph;
+import org.apache.giraph.block_app.test_setup.TestGraphChecker;
+import org.apache.giraph.block_app.test_setup.TestGraphModifier;
+import org.apache.giraph.block_app.test_setup.TestGraphUtils;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.edge.ReusableEdge;
+import org.apache.giraph.graph.Vertex;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test when vertex gets multiple simultaneous mutations
+ * (i.e. to non-existent vertex, send a message and do add edge request)
+ * and confirm all mutations are correctly processed
+ */
+public class MultipleSimultanousMutationsTest {
+ @Test
+ public void createVertexOnMsgsTest() throws Exception {
+ TestGraphUtils.runTest(
+ new TestGraphModifier<LongWritable, Writable, LongWritable>() {
+ @Override
+ public void modifyGraph(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
+ graph.addEdge(1, 2, 2);
+ }
+ },
+ new TestGraphChecker<LongWritable, Writable, LongWritable>() {
+ @Override
+ public void checkOutput(NumericTestGraph<LongWritable, Writable, LongWritable> graph) {
+ Assert.assertEquals(1, graph.getVertex(1).getNumEdges());
+ Assert.assertNull(graph.getVertex(1).getEdgeValue(new LongWritable(-1)));
+ Assert.assertEquals(2, graph.getVertex(1).getEdgeValue(new LongWritable(2)).get());
+
+ Assert.assertEquals(1, graph.getVertex(2).getNumEdges());
+ Assert.assertEquals(-1, graph.getVertex(2).getEdgeValue(new LongWritable(-1)).get());
+ }
+ },
+ new BulkConfigurator() {
+ @Override
+ public void configure(GiraphConfiguration conf) {
+ BlockUtils.setBlockFactoryClass(conf, SendingAndAddEdgeBlockFactory.class);
+ }
+ });
+ }
+
+ public static class SendingAndAddEdgeBlockFactory extends TestLongNullNullBlockFactory {
+ @Override
+ protected Class<? extends Writable> getEdgeValueClass(GiraphConfiguration conf) {
+ return LongWritable.class;
+ }
+
+ @Override
+ public Block createBlock(GiraphConfiguration conf) {
+ return new Piece<LongWritable, Writable, LongWritable, NullWritable, Object>() {
+ @Override
+ protected Class<NullWritable> getMessageClass() {
+ return NullWritable.class;
+ }
+
+ @Override
+ public VertexSender<LongWritable, Writable, LongWritable> getVertexSender(
+ final BlockWorkerSendApi<LongWritable, Writable, LongWritable, NullWritable> workerApi,
+ Object executionStage) {
+ final ReusableEdge<LongWritable, LongWritable> reusableEdge = workerApi.getConf().createReusableEdge();
+ reusableEdge.setTargetVertexId(new LongWritable(-1));
+ reusableEdge.setValue(new LongWritable(-1));
+ return new VertexSender<LongWritable, Writable, LongWritable>() {
+ @Override
+ public void vertexSend(Vertex<LongWritable, Writable, LongWritable> vertex) {
+ for (Edge<LongWritable, LongWritable> edge : vertex.getEdges()) {
+ workerApi.addEdgeRequest(edge.getTargetVertexId(), reusableEdge);
+ workerApi.sendMessage(edge.getTargetVertexId(), NullWritable.get());
+ }
+ }
+ };
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
index e74703e..8ad3767 100644
--- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
+++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java
@@ -873,7 +873,7 @@ public interface GiraphConstants {
"Overrides default partition count calculation if not -1");
/** Vertex key space size for
- * {@link org.apache.giraph.partition.SimpleWorkerPartitioner}
+ * {@link org.apache.giraph.partition.WorkerGraphPartitionerImpl}
*/
String PARTITION_VERTEX_KEY_SPACE_SIZE = "giraph.vertexKeySpaceSize";
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
deleted file mode 100644
index ebc62f6..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.integration;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.giraph.partition.BasicPartitionOwner;
-import org.apache.giraph.partition.HashMasterPartitioner;
-import org.apache.giraph.partition.HashPartitionerFactory;
-import org.apache.giraph.partition.MasterGraphPartitioner;
-import org.apache.giraph.partition.PartitionOwner;
-import org.apache.giraph.partition.PartitionStats;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Example graph partitioner that builds on {@link HashMasterPartitioner} to
- * send the partitions to the worker that matches the superstep. It is for
- * testing only and should never be used in practice.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class SuperstepHashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends HashPartitionerFactory<I, V, E> {
- /**
- * Changes the {@link HashMasterPartitioner} to make ownership of the
- * partitions based on a superstep. For testing only as it is totally
- * unbalanced.
- *
- * @param <I> vertex id
- * @param <V> vertex data
- * @param <E> edge data
- */
- private static class SuperstepMasterPartition<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends HashMasterPartitioner<I, V, E> {
- /** Class logger */
- private static Logger LOG =
- Logger.getLogger(SuperstepMasterPartition.class);
-
- /**
- * Construction with configuration.
- *
- * @param conf Configuration to be stored.
- */
- public SuperstepMasterPartition(ImmutableClassesGiraphConfiguration conf) {
- super(conf);
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkerInfos,
- int maxWorkers,
- long superstep) {
- // Assign all the partitions to
- // superstep mod availableWorkerInfos
- // Guaranteed to be different if the workers (and their order)
- // do not change
- long workerIndex = superstep % availableWorkerInfos.size();
- int i = 0;
- WorkerInfo chosenWorkerInfo = null;
- for (WorkerInfo workerInfo : availableWorkerInfos) {
- if (workerIndex == i) {
- chosenWorkerInfo = workerInfo;
- }
- ++i;
- }
- if (LOG.isInfoEnabled()) {
- LOG.info("generateChangedPartitionOwners: Chosen worker " +
- "for superstep " + superstep + " is " +
- chosenWorkerInfo);
- }
-
- List<PartitionOwner> partitionOwnerList = new ArrayList<PartitionOwner>();
- for (PartitionOwner partitionOwner :
- getCurrentPartitionOwners()) {
- WorkerInfo prevWorkerinfo =
- partitionOwner.getWorkerInfo().equals(chosenWorkerInfo) ?
- null : partitionOwner.getWorkerInfo();
- PartitionOwner tmpPartitionOwner =
- new BasicPartitionOwner(partitionOwner.getPartitionId(),
- chosenWorkerInfo,
- prevWorkerinfo,
- null);
- partitionOwnerList.add(tmpPartitionOwner);
- LOG.info("partition owner was " + partitionOwner +
- ", new " + tmpPartitionOwner);
- }
- setPartitionOwnerList(partitionOwnerList);
- return partitionOwnerList;
- }
- }
-
- @Override
- public MasterGraphPartitioner<I, V, E>
- createMasterGraphPartitioner() {
- return new SuperstepMasterPartition<I, V, E>(getConf());
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java b/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
deleted file mode 100644
index 4c6ae30..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/integration/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of all helper integration test objects.
- */
-package org.apache.giraph.integration;
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
index c5e2f3e..5726d25 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactory.java
@@ -18,7 +18,7 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -26,34 +26,98 @@ import org.apache.hadoop.io.WritableComparable;
/**
* Defines the partitioning framework for this application.
*
- * @param <I> Vertex index value
+ * Abstracts and implements all GraphPartitionerFactoryInterface logic
+ * on top of two functions which define partitioning scheme:
+ * - which partition vertex should be in, and
+ * - which partition should belong to which worker
+ *
+ * @param <I> Vertex id value
* @param <V> Vertex value
* @param <E> Edge value
*/
@SuppressWarnings("rawtypes")
-public interface GraphPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable> extends
- ImmutableClassesGiraphConfigurable<I, V, E> {
+public abstract class GraphPartitionerFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
+ implements GraphPartitionerFactoryInterface<I, V, E> {
+ @Override
+ public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
+ }
+
+ @Override
+ public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+ return new MasterGraphPartitionerImpl<I, V, E>(getConf()) {
+ @Override
+ protected int getWorkerIndex(int partition, int partitionCount,
+ int workerCount) {
+ return GraphPartitionerFactory.this.getWorker(
+ partition, partitionCount, workerCount);
+ }
+ };
+ }
+
+ @Override
+ public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+ return new WorkerGraphPartitionerImpl<I, V, E>() {
+ @Override
+ protected int getPartitionIndex(I id, int partitionCount,
+ int workerCount) {
+ return GraphPartitionerFactory.this.getPartition(id,
+ partitionCount, workerCount);
+ }
+ };
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of workers
+ * @return partition
+ */
+ public abstract int getPartition(I id, int partitionCount,
+ int workerCount);
/**
- * Use some local data present in the worker
+ * Calculates worker that should be responsible for passed partition.
*
- * @param localData localData present in the worker
+ * @param partition Current partition
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of workers
+ * @return index of worker responsible for current partition
*/
- void initialize(LocalData<I, V, E, ? extends Writable> localData);
+ public abstract int getWorker(
+ int partition, int partitionCount, int workerCount);
+
/**
- * Create the {@link MasterGraphPartitioner} used by the master.
- * Instantiated once by the master and reused.
+ * Utility function for calculating in which partition value
+ * from interval [0, max) should belong to.
*
- * @return Instantiated master graph partitioner
+ * @param value Value for which partition is requested
+ * @param max Maximum possible value
+ * @param partitions Number of partitions, equally sized.
+ * @return Index of partition where value belongs to.
*/
- MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner();
+ public static int getPartitionInRange(int value, int max, int partitions) {
+ double keyRange = ((double) max) / partitions;
+ int part = (int) ((value % max) / keyRange);
+ return Math.max(0, Math.min(partitions - 1, part));
+ }
/**
- * Create the {@link WorkerGraphPartitioner} used by the worker.
- * Instantiated once by every worker and reused.
+ * Utility function for calculating in which partition value
+ * from interval [0, max) should belong to.
*
- * @return Instantiated worker graph partitioner
+ * @param value Value for which partition is requested
+ * @param max Maximum possible value
+ * @param partitions Number of partitions, equally sized.
+ * @return Index of partition where value belongs to.
*/
- WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner();
+ public static int getPartitionInRange(long value, long max, int partitions) {
+ double keyRange = ((double) max) / partitions;
+ int part = (int) ((value % max) / keyRange);
+ return Math.max(0, Math.min(partitions - 1, part));
+ }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
new file mode 100644
index 0000000..5551100
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/GraphPartitionerFactoryInterface.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
+import org.apache.giraph.worker.LocalData;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Defines the partitioning framework for this application.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+@SuppressWarnings("rawtypes")
+public interface GraphPartitionerFactoryInterface<I extends WritableComparable,
+ V extends Writable, E extends Writable> extends
+ ImmutableClassesGiraphConfigurable<I, V, E> {
+
+ /**
+ * Use some local data present in the worker
+ *
+ * @param localData localData present in the worker
+ */
+ void initialize(LocalData<I, V, E, ? extends Writable> localData);
+ /**
+ * Create the {@link MasterGraphPartitioner} used by the master.
+ * Instantiated once by the master and reused.
+ *
+ * @return Instantiated master graph partitioner
+ */
+ MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner();
+
+ /**
+ * Create the {@link WorkerGraphPartitioner} used by the worker.
+ * Instantiated once by every worker and reused.
+ *
+ * @return Instantiated worker graph partitioner
+ */
+ WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner();
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
deleted file mode 100644
index 607347d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashMasterPartitioner.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * Master will execute a hash based partitioning.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable> implements
- MasterGraphPartitioner<I, V, E> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
- /** Provided configuration */
- private ImmutableClassesGiraphConfiguration conf;
- /** Save the last generated partition owner list */
- private List<PartitionOwner> partitionOwnerList;
-
- /**
- * Constructor.
- *
- *@param conf Configuration used.
- */
- public HashMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-
- @Override
- public Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- int partitionCount = PartitionUtils.computePartitionCount(
- availableWorkerInfos.size(), conf);
- List<PartitionOwner> ownerList = new ArrayList<PartitionOwner>();
- Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
- for (int i = 0; i < partitionCount; ++i) {
- PartitionOwner owner = new BasicPartitionOwner(i, workerIt.next());
- if (!workerIt.hasNext()) {
- workerIt = availableWorkerInfos.iterator();
- }
- ownerList.add(owner);
- }
- this.partitionOwnerList = ownerList;
- return ownerList;
- }
-
- @Override
- public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
- this.partitionOwnerList = Lists.newArrayList(partitionOwners);
- }
-
- @Override
- public Collection<PartitionOwner> getCurrentPartitionOwners() {
- return partitionOwnerList;
- }
-
- /**
- * Subclasses can set the partition owner list.
- *
- * @param partitionOwnerList New partition owner list.
- */
- protected void setPartitionOwnerList(List<PartitionOwner>
- partitionOwnerList) {
- this.partitionOwnerList = partitionOwnerList;
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkerInfos,
- int maxWorkers,
- long superstep) {
- return PartitionBalancer.balancePartitionsAcrossWorkers(
- conf,
- partitionOwnerList,
- allPartitionStatsList,
- availableWorkerInfos);
- }
-
- @Override
- public PartitionStats createPartitionStats() {
- return new PartitionStats();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
index 221e50d..17aec51 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashPartitionerFactory.java
@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.giraph.partition;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -27,27 +24,21 @@ import org.apache.hadoop.io.WritableComparable;
* Divides the vertices into partitions by their hash code using a simple
* round-robin hash for great balancing if given a random hash code.
*
- * @param <I> Vertex index value
+ * @param <I> Vertex id value
* @param <V> Vertex value
* @param <E> Edge value
*/
-@SuppressWarnings("rawtypes")
public class HashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements GraphPartitionerFactory<I, V, E> {
-
- @Override
- public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
- }
+ V extends Writable, E extends Writable>
+ extends GraphPartitionerFactory<I, V, E> {
@Override
- public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E>(getConf());
+ public int getPartition(I id, int partitionCount, int workerCount) {
+ return Math.abs(id.hashCode() % partitionCount);
}
@Override
- public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
- return new HashWorkerPartitioner<I, V, E>();
+ public int getWorker(int partition, int partitionCount, int workerCount) {
+ return partition % workerCount;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
index 5f7ee40..ef65800 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangePartitionerFactory.java
@@ -18,11 +18,11 @@
package org.apache.giraph.partition;
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import com.google.common.primitives.UnsignedInts;
+
/**
* Divides the vertices into partitions by their hash code using ranges of the
* hash space.
@@ -33,21 +33,22 @@ import org.apache.hadoop.io.WritableComparable;
*/
@SuppressWarnings("rawtypes")
public class HashRangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements GraphPartitionerFactory<I, V, E> {
+ V extends Writable, E extends Writable>
+ extends GraphPartitionerFactory<I, V, E> {
- @Override
- public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
- }
+ /** A transformed hashCode() must be strictly smaller than this. */
+ private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
@Override
- public MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E>(getConf());
+ public int getPartition(I id, int partitionCount, int workerCount) {
+ long unsignedHashCode = UnsignedInts.toLong(id.hashCode());
+ // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
+ // index of size - 1, and unsignedHashCode of 0 yields index of 0.
+ return (int) ((unsignedHashCode * partitionCount) / HASH_LIMIT);
}
@Override
- public WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
- return new HashRangeWorkerPartitioner<I, V, E>();
+ public int getWorker(int partition, int partitionCount, int workerCount) {
+ return partition % workerCount;
}
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
deleted file mode 100644
index 81c3d7d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashRangeWorkerPartitioner.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.primitives.UnsignedInts;
-
-/**
- * Implements range-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashRangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends HashWorkerPartitioner<I, V, E> {
- /** A transformed hashCode() must be strictly smaller than this. */
- private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode());
- // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
- // index of size - 1, and unsignedHashCode of 0 yields index of 0.
- int index = (int)
- ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT);
- return partitionOwnerList.get(index);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
deleted file mode 100644
index 12aa417..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/HashWorkerPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Implements hash-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public class HashWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements WorkerGraphPartitioner<I, V, E> {
- /**
- * Mapping of the vertex ids to {@link PartitionOwner}.
- */
- protected List<PartitionOwner> partitionOwnerList =
- Lists.newArrayList();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new BasicPartitionOwner();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- return partitionOwnerList.get(
- Math.abs(vertexId.hashCode() % partitionOwnerList.size()));
- }
-
- @Override
- public Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- PartitionStore<I, V, E> partitionStore) {
- // No modification necessary
- return workerPartitionStats;
- }
-
- @Override
- public PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners) {
- return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
- myWorkerInfo, masterSetPartitionOwners);
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
index e129050..98d1285 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/LongMappingStorePartitionerFactory.java
@@ -31,7 +31,7 @@ import org.apache.log4j.Logger;
*/
@SuppressWarnings("unchecked")
public class LongMappingStorePartitionerFactory<V extends Writable,
- E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+ E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
/** Logger Instance */
private static final Logger LOG = Logger.getLogger(
LongMappingStorePartitionerFactory.class);
@@ -46,14 +46,14 @@ public class LongMappingStorePartitionerFactory<V extends Writable,
}
@Override
- protected int getPartition(LongWritable id, int partitionCount,
+ public int getPartition(LongWritable id, int partitionCount,
int workerCount) {
return localData.getMappingStoreOps().getPartition(id,
partitionCount, workerCount);
}
@Override
- protected int getWorker(int partition, int partitionCount, int workerCount) {
+ public int getWorker(int partition, int partitionCount, int workerCount) {
int numRows = partitionCount / workerCount;
numRows = (numRows * workerCount == partitionCount) ? numRows : numRows + 1;
return partition / numRows;
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
new file mode 100644
index 0000000..b9916dc
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/MasterGraphPartitionerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Abstracts and implements all MasterGraphPartitioner logic on top of a single
+ * user function - getWorkerIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class MasterGraphPartitionerImpl<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements MasterGraphPartitioner<I, V, E> {
+ /** Provided configuration */
+ private final ImmutableClassesGiraphConfiguration<I, V, E> conf;
+ /** Save the last generated partition owner list */
+ private List<PartitionOwner> partitionOwnerList;
+
+ /**
+ * Constructor.
+ *
+ * @param conf
+ * Configuration used.
+ */
+ public MasterGraphPartitionerImpl(
+ ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Collection<PartitionOwner> createInitialPartitionOwners(
+ Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+ int partitionCount = PartitionUtils.computePartitionCount(
+ availableWorkerInfos.size(), conf);
+ ArrayList<WorkerInfo> workerList =
+ new ArrayList<WorkerInfo>(availableWorkerInfos);
+
+ partitionOwnerList = new ArrayList<PartitionOwner>();
+ for (int i = 0; i < partitionCount; i++) {
+ partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
+ getWorkerIndex(i, partitionCount, workerList.size()))));
+ }
+
+ return partitionOwnerList;
+ }
+
+ @Override
+ public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
+ partitionOwnerList = Lists.newArrayList(partitionOwners);
+ }
+
+ @Override
+ public Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkers,
+ int maxWorkers,
+ long superstep) {
+ return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
+ partitionOwnerList, allPartitionStatsList, availableWorkers);
+ }
+
+ @Override
+ public Collection<PartitionOwner> getCurrentPartitionOwners() {
+ return partitionOwnerList;
+ }
+
+ @Override
+ public PartitionStats createPartitionStats() {
+ return new PartitionStats();
+ }
+
+ /**
+ * Calculates worker that should be responsible for passed partition.
+ *
+ * @param partition Current partition
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of workers
+ * @return index of worker responsible for current partition
+ */
+ protected abstract int getWorkerIndex(
+ int partition, int partitionCount, int workerCount);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 5dd580b..6d1dcb1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Writable;
* @param <E> Edge value type
*/
public class SimpleIntRangePartitionerFactory<V extends Writable,
- E extends Writable> extends SimplePartitionerFactory<IntWritable, V, E> {
+ E extends Writable> extends GraphPartitionerFactory<IntWritable, V, E> {
/** Vertex key space size. */
private int keySpaceSize;
@Override
- protected int getPartition(IntWritable id, int partitionCount,
+ public int getPartition(IntWritable id, int partitionCount,
int workerCount) {
return getPartition(id, partitionCount);
}
@@ -56,7 +56,7 @@ public class SimpleIntRangePartitionerFactory<V extends Writable,
}
@Override
- protected int getWorker(int partition, int partitionCount, int workerCount) {
+ public int getWorker(int partition, int partitionCount, int workerCount) {
return getPartitionInRange(partition, partitionCount, workerCount);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index e637e16..9dee3d1 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -32,13 +32,13 @@ import org.apache.hadoop.io.Writable;
* @param <E> Edge value type
*/
public class SimpleLongRangePartitionerFactory<V extends Writable,
- E extends Writable> extends SimplePartitionerFactory<LongWritable, V, E> {
+ E extends Writable> extends GraphPartitionerFactory<LongWritable, V, E> {
/** Vertex key space size. */
private long keySpaceSize;
@Override
- protected int getPartition(LongWritable id, int partitionCount,
+ public int getPartition(LongWritable id, int partitionCount,
int workerCount) {
return getPartition(id, partitionCount);
}
@@ -56,7 +56,7 @@ public class SimpleLongRangePartitionerFactory<V extends Writable,
}
@Override
- protected int getWorker(int partition, int partitionCount, int workerCount) {
+ public int getWorker(int partition, int partitionCount, int workerCount) {
return getPartitionInRange(partition, partitionCount, workerCount);
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
deleted file mode 100644
index 638dacf..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-/**
- * Abstracts and implements all MasterGraphPartitioner logic on top of a single
- * user function - getWorkerIndex.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements MasterGraphPartitioner<I, V, E> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
- /** Provided configuration */
- private ImmutableClassesGiraphConfiguration conf;
- /** Save the last generated partition owner list */
- private List<PartitionOwner> partitionOwnerList;
-
- /**
- * Constructor.
- *
- * @param conf
- * Configuration used.
- */
- public SimpleMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-
- @Override
- public Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- int partitionCount = PartitionUtils.computePartitionCount(
- availableWorkerInfos.size(), conf);
- ArrayList<WorkerInfo> workerList =
- new ArrayList<WorkerInfo>(availableWorkerInfos);
-
- partitionOwnerList = new ArrayList<PartitionOwner>();
- for (int i = 0; i < partitionCount; i++) {
- partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
- getWorkerIndex(i, partitionCount, workerList.size()))));
- }
-
- return partitionOwnerList;
- }
-
- @Override
- public void setPartitionOwners(Collection<PartitionOwner> partitionOwners) {
- partitionOwnerList = Lists.newArrayList(partitionOwners);
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkers,
- int maxWorkers,
- long superstep) {
- return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
- partitionOwnerList, allPartitionStatsList, availableWorkers);
- }
-
- @Override
- public Collection<PartitionOwner> getCurrentPartitionOwners() {
- return partitionOwnerList;
- }
-
- @Override
- public PartitionStats createPartitionStats() {
- return new PartitionStats();
- }
-
- /**
- * Calculates worker that should be responsible for passed partition.
- *
- * @param partition Current partition
- * @param partitionCount Number of partitions
- * @param workerCount Number of workers
- * @return index of worker responsible for current partition
- */
- protected abstract int getWorkerIndex(
- int partition, int partitionCount, int workerCount);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/ca36f1d4/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
deleted file mode 100644
index 1e29846..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
-import org.apache.giraph.worker.LocalData;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Abstracts and implements all GraphPartitionerFactory logic on top of two
- * functions which define partitioning scheme:
- * - which partition user should be in, and
- * - which partition should belong to which worker
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-public abstract class SimplePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- extends DefaultImmutableClassesGiraphConfigurable<I, V, E>
- implements GraphPartitionerFactory<I, V, E> {
-
- @Override
- public void initialize(LocalData<I, V, E, ? extends Writable> localData) {
- }
-
- @Override
- public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
- return new SimpleMasterPartitioner<I, V, E>(getConf()) {
- @Override
- protected int getWorkerIndex(int partition, int partitionCount,
- int workerCount) {
- return SimplePartitionerFactory.this.getWorker(
- partition, partitionCount, workerCount);
- }
- };
- }
-
- @Override
- public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
- return new SimpleWorkerPartitioner<I, V, E>() {
- @Override
- protected int getPartitionIndex(I id, int partitionCount,
- int workerCount) {
- return SimplePartitionerFactory.this.getPartition(id,
- partitionCount, workerCount);
- }
- };
- }
-
- /**
- * Calculates in which partition current vertex belongs to,
- * from interval [0, partitionCount).
- *
- * @param id Vertex id
- * @param partitionCount Number of partitions
- * @param workerCount Number of workers
- * @return partition
- */
- protected abstract int getPartition(I id, int partitionCount,
- int workerCount);
-
- /**
- * Calculates worker that should be responsible for passed partition.
- *
- * @param partition Current partition
- * @param partitionCount Number of partitions
- * @param workerCount Number of workers
- * @return index of worker responsible for current partition
- */
- protected abstract int getWorker(
- int partition, int partitionCount, int workerCount);
-
- /**
- * Utility function for calculating in which partition value
- * from interval [0, max) should belong to.
- *
- * @param value Value for which partition is requested
- * @param max Maximum possible value
- * @param partitions Number of partitions, equally sized.
- * @return Index of partition where value belongs to.
- */
- public static int getPartitionInRange(int value, int max, int partitions) {
- double keyRange = ((double) max) / partitions;
- int part = (int) ((value % max) / keyRange);
- return Math.max(0, Math.min(partitions - 1, part));
- }
-
- /**
- * Utility function for calculating in which partition value
- * from interval [0, max) should belong to.
- *
- * @param value Value for which partition is requested
- * @param max Maximum possible value
- * @param partitions Number of partitions, equally sized.
- * @return Index of partition where value belongs to.
- */
- public static int getPartitionInRange(long value, long max, int partitions) {
- double keyRange = ((double) max) / partitions;
- int part = (int) ((value % max) / keyRange);
- return Math.max(0, Math.min(partitions - 1, part));
- }
-}