You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2014/01/08 21:04:42 UTC
git commit: updated refs/heads/trunk to cdb49fd
Updated Branches:
refs/heads/trunk 4ce0f6a0d -> cdb49fd5f
GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo)
Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/cdb49fd5
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/cdb49fd5
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/cdb49fd5
Branch: refs/heads/trunk
Commit: cdb49fd5f76027a4a66930e6b266ba79266c9852
Parents: 4ce0f6a
Author: Maja Kabiljo <ma...@fb.com>
Authored: Wed Jan 8 12:04:23 2014 -0800
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Wed Jan 8 12:04:23 2014 -0800
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/giraph/partition/PartitionUtils.java | 15 ++-
.../partition/RangeMasterPartitioner.java | 42 -------
.../giraph/partition/RangePartitionOwner.java | 90 --------------
.../giraph/partition/RangePartitionStats.java | 68 -----------
.../partition/RangePartitionerFactory.java | 42 -------
.../apache/giraph/partition/RangeSplitHint.java | 72 -----------
.../partition/RangeWorkerPartitioner.java | 77 ------------
.../SimpleIntRangePartitionerFactory.java | 45 +++----
.../SimpleLongRangePartitionerFactory.java | 43 +++----
.../partition/SimpleMasterPartitioner.java | 106 ++++++++++++++++
.../partition/SimplePartitionerFactory.java | 122 +++++++++++++++++++
.../partition/SimpleRangeMasterPartitioner.java | 116 ------------------
.../partition/SimpleRangeWorkerPartitioner.java | 107 ----------------
.../partition/SimpleWorkerPartitioner.java | 85 +++++++++++++
.../SimpleRangePartitionFactoryTest.java | 117 ++++++++++++++++++
16 files changed, 473 insertions(+), 676 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index d0259e3..b29aa66 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
Giraph Change Log
Release 1.1.0 - unreleased
+ GIRAPH-785: Improve GraphPartitionerFactory usage (ikabiljo via majakabiljo)
+
GIRAPH-815: Exclude dependency and duplicate finder checks to profile we do not check (aching)
GIRAPH-798: Upgrade Giraph to Java7 and fix all dependencies (aching)
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index b055f4d..68bc2de 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -195,11 +195,20 @@ public class PartitionUtils {
}
int maxPartitions = getMaxPartitions(conf);
if (partitionCount > maxPartitions) {
+ // try to keep partitionCount divisible by number of workers
+ // in order to keep the balance
+ int reducedPartitions = (maxPartitions / availableWorkerInfos.size()) *
+ availableWorkerInfos.size();
+ if (reducedPartitions == 0) {
+ reducedPartitions = maxPartitions;
+ }
LOG.warn("computePartitionCount: " +
- "Reducing the partitionCount to " + maxPartitions +
- " from " + partitionCount);
- partitionCount = maxPartitions;
+ "Reducing the partitionCount to " + reducedPartitions +
+ " from " + partitionCount + " because of " + maxPartitions +
+ " limit");
+ partitionCount = reducedPartitions;
}
+
return partitionCount;
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
deleted file mode 100644
index 3911a95..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeMasterPartitioner.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Some functionality is provided, but this is meant for developers to
- * determine the partitioning based on the actual types of data. The
- * implementation of several methods are left to the developer who is trying
- * to control the amount of messages sent from one worker to another.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable> implements
- MasterGraphPartitioner<I, V, E> {
- @Override
- public PartitionStats createPartitionStats() {
- return new RangePartitionStats<I>();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
deleted file mode 100644
index e7e03dc..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionOwner.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Added the max key index in to the {@link PartitionOwner}. Also can provide
- * a split hint if desired.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionOwner<I extends WritableComparable>
- extends BasicPartitionOwner {
- /** Max index for this partition */
- private I maxIndex;
-
- /**
- * Default constructor.
- */
- public RangePartitionOwner() { }
-
- /**
- * Constructor with the max index.
- *
- * @param maxIndex Max index of this partition.
- */
- public RangePartitionOwner(I maxIndex) {
- this.maxIndex = maxIndex;
- }
-
- /**
- * Get the maximum index of this partition owner.
- *
- * @return Maximum index.
- */
- public I getMaxIndex() {
- return maxIndex;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- maxIndex = (I) getConf().createVertexId();
- maxIndex.readFields(input);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- maxIndex.write(output);
- }
-
- @Override
- public void writeWithWorkerIds(DataOutput output) throws IOException {
- super.writeWithWorkerIds(output);
- maxIndex.write(output);
- }
-
- @Override
- public void readFieldsWithWorkerIds(DataInput input,
- Map<Integer, WorkerInfo> workerInfoMap) throws IOException {
- super.readFieldsWithWorkerIds(input, workerInfoMap);
- maxIndex = (I) getConf().createVertexId();
- maxIndex.readFields(input);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
deleted file mode 100644
index 73af816..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionStats.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Same as {@link PartitionStats}, but also includes the hint for range-based
- * partitioning.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionStats<I extends WritableComparable>
- extends PartitionStats {
- /** Can be null if no hint, otherwise a splitting hint */
- private RangeSplitHint<I> hint;
-
- /**
- * Get the range split hint (if any)
- *
- * @return Hint of how to split the range if desired, null otherwise
- */
- public RangeSplitHint<I> getRangeSplitHint() {
- return hint;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- boolean hintExists = input.readBoolean();
- if (hintExists) {
- hint = new RangeSplitHint<I>();
- hint.readFields(input);
- } else {
- hint = null;
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- output.writeBoolean(hint != null);
- if (hint != null) {
- hint.write(output);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
deleted file mode 100644
index 2ec4d4a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangePartitionerFactory.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type. This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent. The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed. Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * See {@link RangeWorkerPartitioner}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable>
- implements GraphPartitionerFactory<I, V, E> {
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
deleted file mode 100644
index 4317944..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeSplitHint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Hint to the {@link RangeMasterPartitioner} about how a
- * {@link RangePartitionOwner} can be split.
- *
- * @param <I> Vertex index to split around
- */
-@SuppressWarnings("rawtypes")
-public class RangeSplitHint<I extends WritableComparable>
- implements Writable, ImmutableClassesGiraphConfigurable {
- /** Hinted split index */
- private I splitIndex;
- /** Number of vertices in this range before the split */
- private long preSplitVertexCount;
- /** Number of vertices in this range after the split */
- private long postSplitVertexCount;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?> conf;
-
- @Override
- public void readFields(DataInput input) throws IOException {
- splitIndex = conf.createVertexId();
- splitIndex.readFields(input);
- preSplitVertexCount = input.readLong();
- postSplitVertexCount = input.readLong();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- splitIndex.write(output);
- output.writeLong(preSplitVertexCount);
- output.writeLong(postSplitVertexCount);
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
deleted file mode 100644
index cbcd753..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/RangeWorkerPartitioner.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type. This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent. The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed. Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * Note: This implementation is incomplete, the developer must implement the
- * various methods based on their index type.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable> implements
- WorkerGraphPartitioner<I, V, E> {
- /** Mapping of the vertex ids to the {@link PartitionOwner} */
- protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
- new TreeMap<I, RangePartitionOwner<I>>();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new RangePartitionOwner<I>();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- // Find the partition owner based on the maximum partition id.
- // If the vertex id exceeds any of the maximum partition ids, give
- // it to the last one
- if (vertexId == null) {
- throw new IllegalArgumentException(
- "getPartitionOwner: Illegal null vertex id");
- }
- I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
- if (maxVertexIndex == null) {
- return vertexRangeMap.lastEntry().getValue();
- } else {
- return vertexRangeMap.get(vertexId);
- }
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return vertexRangeMap.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
index 7aee84c..8ab692f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable;
* @param <V> Vertex value type
* @param <E> Edge value type
*/
-public class SimpleIntRangePartitionerFactory<V extends Writable,
- E extends Writable>
- implements GraphPartitionerFactory<IntWritable, V, E> {
- /** Configuration. */
- private ImmutableClassesGiraphConfiguration conf;
+public class SimpleIntRangePartitionerFactory
+ <V extends Writable, E extends Writable>
+ extends SimplePartitionerFactory<IntWritable, V, E> {
+
/** Vertex key space size. */
- private long keySpaceSize;
+ private int keySpaceSize;
@Override
- public MasterGraphPartitioner<IntWritable, V, E>
- createMasterGraphPartitioner() {
- return new SimpleRangeMasterPartitioner<IntWritable, V, E>(conf);
+ protected int getPartition(IntWritable id, int partitionCount) {
+ return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
}
@Override
- public WorkerGraphPartitioner<IntWritable, V, E>
- createWorkerGraphPartitioner() {
- return new SimpleRangeWorkerPartitioner<IntWritable, V, E>(
- keySpaceSize) {
- @Override
- protected long vertexKeyFromId(IntWritable id) {
- // The modulo is just a safeguard in case keySpaceSize is incorrect.
- return id.get() % keySpaceSize;
- }
- };
+ protected int getWorker(int partition, int partitionCount, int workerCount) {
+ return getPartitionInRange(partition, partitionCount, workerCount);
}
@Override
public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
- -1);
+ super.setConf(conf);
+ keySpaceSize =
+ conf.getInt(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1);
if (keySpaceSize == -1) {
- throw new IllegalStateException("Need to specify " + GiraphConstants
- .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
- "SimpleRangePartitioner");
+ throw new IllegalStateException("Need to specify " +
+ GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE +
+ " when using SimpleIntRangePartitionerFactory");
}
}
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
index 64efde9..2989598 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -31,47 +31,32 @@ import org.apache.hadoop.io.Writable;
* @param <V> Vertex value type
* @param <E> Edge value type
*/
-public class SimpleLongRangePartitionerFactory<V extends Writable,
- E extends Writable>
- implements GraphPartitionerFactory<LongWritable, V, E> {
- /** Configuration. */
- private ImmutableClassesGiraphConfiguration conf;
+public class SimpleLongRangePartitionerFactory
+ <V extends Writable, E extends Writable>
+ extends SimplePartitionerFactory<LongWritable, V, E> {
+
/** Vertex key space size. */
private long keySpaceSize;
@Override
- public MasterGraphPartitioner<LongWritable, V, E>
- createMasterGraphPartitioner() {
- return new SimpleRangeMasterPartitioner<LongWritable, V, E>(conf);
+ protected int getPartition(LongWritable id, int partitionCount) {
+ return getPartitionInRange(id.get(), keySpaceSize, partitionCount);
}
@Override
- public WorkerGraphPartitioner<LongWritable, V, E>
- createWorkerGraphPartitioner() {
- return new SimpleRangeWorkerPartitioner<LongWritable, V, E>(
- keySpaceSize) {
- @Override
- protected long vertexKeyFromId(LongWritable id) {
- // The modulo is just a safeguard in case keySpaceSize is incorrect.
- return id.get() % keySpaceSize;
- }
- };
+ protected int getWorker(int partition, int partitionCount, int workerCount) {
+ return getPartitionInRange(partition, partitionCount, workerCount);
}
@Override
public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
- -1);
+ super.setConf(conf);
+ keySpaceSize =
+ conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, -1);
if (keySpaceSize == -1) {
- throw new IllegalStateException("Need to specify " + GiraphConstants
- .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
- "SimpleRangePartitioner");
+ throw new IllegalStateException("Need to specify " +
+ GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE +
+ " when using SimpleLongRangePartitionerFactory");
}
}
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
new file mode 100644
index 0000000..f128f34
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleMasterPartitioner.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+/**
+ * Abstracts and implements all MasterGraphPartitioner logic on top of a single
+ * user function - getWorkerIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class SimpleMasterPartitioner<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements MasterGraphPartitioner<I, V, E> {
+ /** Class logger */
+ private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+ /** Provided configuration */
+ private ImmutableClassesGiraphConfiguration conf;
+ /** Save the last generated partition owner list */
+ private List<PartitionOwner> partitionOwnerList;
+
+ /**
+ * Constructor.
+ *
+ * @param conf
+ * Configuration used.
+ */
+ public SimpleMasterPartitioner(ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Collection<PartitionOwner> createInitialPartitionOwners(
+ Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+ int partitionCount = PartitionUtils.computePartitionCount(
+ availableWorkerInfos, maxWorkers, conf);
+ ArrayList<WorkerInfo> workerList =
+ new ArrayList<WorkerInfo>(availableWorkerInfos);
+
+ partitionOwnerList = new ArrayList<PartitionOwner>();
+ for (int i = 0; i < partitionCount; i++) {
+ partitionOwnerList.add(new BasicPartitionOwner(i, workerList.get(
+ getWorkerIndex(i, partitionCount, workerList.size()))));
+ }
+
+ return partitionOwnerList;
+ }
+
+ @Override
+ public Collection<PartitionOwner> generateChangedPartitionOwners(
+ Collection<PartitionStats> allPartitionStatsList,
+ Collection<WorkerInfo> availableWorkers,
+ int maxWorkers,
+ long superstep) {
+ return PartitionBalancer.balancePartitionsAcrossWorkers(conf,
+ partitionOwnerList, allPartitionStatsList, availableWorkers);
+ }
+
+ @Override
+ public Collection<PartitionOwner> getCurrentPartitionOwners() {
+ return partitionOwnerList;
+ }
+
+ @Override
+ public PartitionStats createPartitionStats() {
+ return new PartitionStats();
+ }
+
+ /**
+ * Calculates worker that should be responsible for passed partition.
+ *
+ * @param partition Current partition
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of workers
+ * @return index of worker responsible for current partition
+ */
+ protected abstract int getWorkerIndex(
+ int partition, int partitionCount, int workerCount);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
new file mode 100644
index 0000000..15b0756
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimplePartitionerFactory.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstracts and implements all GraphPartitionerFactory logic on top of two
+ * functions which define partitioning scheme:
+ * - which partition user should be in, and
+ * - which partition should belong to which worker
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ */
+public abstract class SimplePartitionerFactory<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements GraphPartitionerFactory<I, V, E> {
+ /** Configuration. */
+ private ImmutableClassesGiraphConfiguration conf;
+
+ @Override
+ public final MasterGraphPartitioner<I, V, E> createMasterGraphPartitioner() {
+ return new SimpleMasterPartitioner<I, V, E>(conf) {
+ @Override
+ protected int getWorkerIndex(int partition, int partitionCount,
+ int workerCount) {
+ return SimplePartitionerFactory.this.getWorker(
+ partition, partitionCount, workerCount);
+ }
+ };
+ }
+
+ @Override
+ public final WorkerGraphPartitioner<I, V, E> createWorkerGraphPartitioner() {
+ return new SimpleWorkerPartitioner<I, V, E>() {
+ @Override
+ protected int getPartitionIndex(I id, int partitionCount) {
+ return SimplePartitionerFactory.this.getPartition(id, partitionCount);
+ }
+ };
+ }
+
+ @Override
+ public void setConf(ImmutableClassesGiraphConfiguration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public final ImmutableClassesGiraphConfiguration getConf() {
+ return conf;
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @return partition
+ */
+ protected abstract int getPartition(I id, int partitionCount);
+ /**
+ * Calculates worker that should be responsible for passed partition.
+ *
+ * @param partition Current partition
+ * @param partitionCount Number of partitions
+ * @param workerCount Number of workers
+ * @return index of worker responsible for current partition
+ */
+ protected abstract int getWorker(
+ int partition, int partitionCount, int workerCount);
+
+ /**
+ * Utility function for calculating in which partition value
+ * from interval [0, max) should belong to.
+ *
+ * @param value Value for which partition is requested
+ * @param max Maximum possible value
+ * @param partitions Number of partitions, equally sized.
+ * @return Index of partition where value belongs to.
+ */
+ public static int getPartitionInRange(int value, int max, int partitions) {
+ double keyRange = ((double) max) / partitions;
+ int part = (int) ((value % max) / keyRange);
+ return Math.max(0, Math.min(partitions - 1, part));
+ }
+
+ /**
+ * Utility function for calculating in which partition value
+ * from interval [0, max) should belong to.
+ *
+ * @param value Value for which partition is requested
+ * @param max Maximum possible value
+ * @param partitions Number of partitions, equally sized.
+ * @return Index of partition where value belongs to.
+ */
+ public static int getPartitionInRange(long value, long max, int partitions) {
+ double keyRange = ((double) max) / partitions;
+ int part = (int) ((value % max) / keyRange);
+ return Math.max(0, Math.min(partitions - 1, part));
+ }
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
deleted file mode 100644
index 37ce8c7..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.log4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * A range-based master partitioner where equal-sized ranges of partitions
- * are deterministically assigned to workers.
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public class SimpleRangeMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable> implements
- MasterGraphPartitioner<I, V, E> {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
- /** Provided configuration */
- private ImmutableClassesGiraphConfiguration conf;
- /** Save the last generated partition owner list */
- private List<PartitionOwner> partitionOwnerList;
-
- /**
- * Constructor.
- *
- * @param conf Configuration used.
- */
- public SimpleRangeMasterPartitioner(
- ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-
- @Override
- public Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
- int partitionCount = PartitionUtils.computePartitionCount(
- availableWorkerInfos, maxWorkers, conf);
- int rangeSize = partitionCount / availableWorkerInfos.size();
-
- partitionOwnerList = new ArrayList<PartitionOwner>();
- Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
- WorkerInfo currentWorker = null;
-
- int i = 0;
- for (; i < partitionCount; ++i) {
- if (i % rangeSize == 0) {
- if (!workerIt.hasNext()) {
- break;
- }
- currentWorker = workerIt.next();
- }
- partitionOwnerList.add(new BasicPartitionOwner(i, currentWorker));
- }
-
- // Distribute the remainder among all workers.
- if (i < partitionCount) {
- workerIt = availableWorkerInfos.iterator();
- for (; i < partitionCount; ++i) {
- partitionOwnerList.add(new BasicPartitionOwner(i, workerIt.next()));
- }
- }
-
- return partitionOwnerList;
- }
-
- @Override
- public Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkers,
- int maxWorkers,
- long superstep) {
- return PartitionBalancer.balancePartitionsAcrossWorkers(
- conf,
- partitionOwnerList,
- allPartitionStatsList,
- availableWorkers);
- }
-
- @Override
- public Collection<PartitionOwner> getCurrentPartitionOwners() {
- return partitionOwnerList;
- }
-
- @Override
- public PartitionStats createPartitionStats() {
- return new PartitionStats();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
deleted file mode 100644
index ab2afd5..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.worker.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A range-based worker partitioner where equal-sized ranges of vertex ids
- * are deterministically assigned to partitions.
- * The user has to define a mapping from vertex ids to long keys dense in
- * [0, keySpaceSize).
- *
- * @param <I> Vertex id type
- * @param <V> Vertex value type
- * @param <E> Edge value type
- */
-public abstract class SimpleRangeWorkerPartitioner<I extends
- WritableComparable, V extends Writable, E extends Writable>
- implements WorkerGraphPartitioner<I, V, E> {
- /** List of {@link PartitionOwner}s for this worker. */
- private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
- /** Vertex keys space size. */
- private long keySpaceSize;
-
- /**
- * Constructor.
- *
- * @param keySpaceSize Vertex keys space size.
- */
- public SimpleRangeWorkerPartitioner(long keySpaceSize) {
- this.keySpaceSize = keySpaceSize;
- }
-
- /**
- * Get key space size (can be used when implementing vertexKeyFromId()).
- *
- * @return Key space size.
- */
- public long getKeySpaceSize() {
- return keySpaceSize;
- }
-
- /**
- * Convert a vertex id to a unique long key in [0, keySpaceSize].
- *
- * @param id Vertex id
- * @return Unique long key
- */
- protected abstract long vertexKeyFromId(I id);
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new BasicPartitionOwner();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- long rangeSize = keySpaceSize / partitionOwnerList.size();
- return partitionOwnerList.get(
- Math.min((int) (vertexKeyFromId(vertexId) / rangeSize),
- partitionOwnerList.size() - 1));
- }
-
- @Override
- public Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- PartitionStore<I, V, E> partitionStore) {
- // No modification necessary
- return workerPartitionStats;
- }
-
- @Override
- public PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E> partitionStore) {
- return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
- myWorkerInfo, masterSetPartitionOwners, partitionStore);
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
new file mode 100644
index 0000000..600d7a3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleWorkerPartitioner.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Abstracts and implements all WorkerGraphPartitioner logic on top of a single
+ * user function - getPartitionIndex.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ */
+public abstract class SimpleWorkerPartitioner<I extends WritableComparable,
+ V extends Writable, E extends Writable>
+ implements WorkerGraphPartitioner<I, V, E> {
+ /** List of {@link PartitionOwner}s for this worker. */
+ private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+
+ @Override
+ public PartitionOwner createPartitionOwner() {
+ return new BasicPartitionOwner();
+ }
+
+ @Override
+ public PartitionOwner getPartitionOwner(I vertexId) {
+ return partitionOwnerList.get(
+ getPartitionIndex(vertexId, partitionOwnerList.size()));
+ }
+
+ @Override
+ public Collection<PartitionStats> finalizePartitionStats(
+ Collection<PartitionStats> workerPartitionStats,
+ PartitionStore<I, V, E> partitionStore) {
+ // No modification necessary
+ return workerPartitionStats;
+ }
+
+ @Override
+ public PartitionExchange updatePartitionOwners(WorkerInfo myWorkerInfo,
+ Collection<? extends PartitionOwner> masterSetPartitionOwners,
+ PartitionStore<I, V, E> partitionStore) {
+ return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
+ myWorkerInfo, masterSetPartitionOwners, partitionStore);
+ }
+
+ @Override
+ public Collection<? extends PartitionOwner> getPartitionOwners() {
+ return partitionOwnerList;
+ }
+
+ /**
+ * Calculates in which partition current vertex belongs to,
+ * from interval [0, partitionCount).
+ *
+ * @param id Vertex id
+ * @param partitionCount Number of partitions
+ * @return partition
+ */
+ protected abstract int getPartitionIndex(I id, int partitionCount);
+}
http://git-wip-us.apache.org/repos/asf/giraph/blob/cdb49fd5/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
new file mode 100644
index 0000000..4e19cd2
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/partition/SimpleRangePartitionFactoryTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+
+/** Test {@link org.apache.giraph.partition.SimpleLongRangePartitionerFactory}. */
+public class SimpleRangePartitionFactoryTest {
+
+ private void testRange(int numWorkers, int keySpaceSize, int allowedWorkerDiff, boolean emptyWorkers) {
+ Configuration conf = new Configuration();
+ conf.setLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE, keySpaceSize);
+ SimpleLongRangePartitionerFactory<Writable, Writable> factory =
+ new SimpleLongRangePartitionerFactory<Writable, Writable>();
+ factory.setConf(new ImmutableClassesGiraphConfiguration(conf));
+
+ ArrayList<WorkerInfo> infos = new ArrayList<WorkerInfo>();
+ for (int i = 0; i < numWorkers; i++) {
+ WorkerInfo info = new WorkerInfo();
+ info.setTaskId(i);
+ infos.add(info);
+ }
+
+ Collection<PartitionOwner> owners =
+ factory.createMasterGraphPartitioner().createInitialPartitionOwners(infos, -1);
+
+ int[] tasks = new int[owners.size()];
+ for (PartitionOwner owner : owners) {
+ WorkerInfo worker = owner.getWorkerInfo();
+ assertEquals(0, tasks[owner.getPartitionId()]);
+ tasks[owner.getPartitionId()] = worker.getTaskId() + 1;
+ }
+ checkMapping(tasks, allowedWorkerDiff, emptyWorkers);
+
+ WorkerGraphPartitioner<LongWritable, Writable, Writable> workerPartitioner =
+ factory.createWorkerGraphPartitioner();
+ workerPartitioner.updatePartitionOwners(null, owners, null);
+ LongWritable longWritable = new LongWritable();
+
+ int[] partitions = new int[keySpaceSize];
+ for (int i = 0; i < keySpaceSize; i++) {
+ longWritable.set(i);
+ PartitionOwner owner = workerPartitioner.getPartitionOwner(longWritable);
+ partitions[i] = owner.getPartitionId();
+ }
+ checkMapping(partitions, 1, emptyWorkers);
+ }
+
+ private void checkMapping(int[] mapping, int allowedDiff, boolean emptyWorkers) {
+ int prev = -1;
+
+ int max = 0;
+ int min = Integer.MAX_VALUE;
+ int cur = 0;
+ for (int value : mapping) {
+ if (value != prev) {
+ if (prev != -1) {
+ min = Math.min(cur, min);
+ max = Math.max(cur, max);
+ assertTrue(prev < value);
+ if (!emptyWorkers) {
+ assertEquals(prev + 1, value);
+ }
+ }
+ cur = 1;
+ } else {
+ cur++;
+ }
+ prev = value;
+ }
+ assertTrue(min + allowedDiff >= max);
+ }
+
+ @Test
+ public void testLongRangePartitionerFactory() {
+ // perfect distribution
+ testRange(10, 100000, 0, false);
+ testRange(1000, 100000, 0, false);
+
+ // perfect distribution even when max is hit, and max is not divisible by #workers
+ testRange(8949, 100023, 0, false);
+ testRange(1949, 211111, 0, false);
+
+ // imperfect distribution - because there are more workers than max partitions.
+ testRange(194942, 211111, 1, true);
+ }
+}