You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ni...@apache.org on 2013/01/04 21:52:39 UTC
[14/23] GIRAPH-409: Refactor / cleanups (nitay)
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
deleted file mode 100644
index 69e7a5e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashPartitionerFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Divides the vertices into partitions by their hash code using a simple
- * round-robin hash for great balancing if given a random hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashPartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements GraphPartitionerFactory<I, V, E, M> {
- /** Saved configuration */
- private ImmutableClassesGiraphConfiguration conf;
-
- @Override
- public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E, M>(getConf());
- }
-
- @Override
- public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
- return new HashWorkerPartitioner<I, V, E, M>();
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
deleted file mode 100644
index df6457b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangePartitionerFactory.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Divides the vertices into partitions by their hash code using ranges of the
- * hash space.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashRangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements GraphPartitionerFactory<I, V, E, M> {
- /** Saved configuration */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
-
- @Override
- public MasterGraphPartitioner<I, V, E, M> createMasterGraphPartitioner() {
- return new HashMasterPartitioner<I, V, E, M>(getConf());
- }
-
- @Override
- public WorkerGraphPartitioner<I, V, E, M> createWorkerGraphPartitioner() {
- return new HashRangeWorkerPartitioner<I, V, E, M>();
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
deleted file mode 100644
index ea2cf66..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashRangeWorkerPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import com.google.common.primitives.UnsignedInts;
-
-/**
- * Implements range-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashRangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends HashWorkerPartitioner<I, V, E, M> {
- /** A transformed hashCode() must be strictly smaller than this. */
- private static final long HASH_LIMIT = 2L * Integer.MAX_VALUE + 2L;
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- long unsignedHashCode = UnsignedInts.toLong(vertexId.hashCode());
- // The reader can verify that unsignedHashCode of HASH_LIMIT - 1 yields
- // index of size - 1, and unsignedHashCode of 0 yields index of 0.
- int index = (int)
- ((unsignedHashCode * getPartitionOwners().size()) / HASH_LIMIT);
- return partitionOwnerList.get(index);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
deleted file mode 100644
index a76f803..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/HashWorkerPartitioner.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import com.google.common.collect.Lists;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Implements hash-based partitioning from the id hash code.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public class HashWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements WorkerGraphPartitioner<I, V, E, M> {
- /**
- * Mapping of the vertex ids to {@link PartitionOwner}.
- */
- protected List<PartitionOwner> partitionOwnerList =
- Lists.newArrayList();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new BasicPartitionOwner();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- return partitionOwnerList.get(
- Math.abs(vertexId.hashCode() % partitionOwnerList.size()));
- }
-
- @Override
- public Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- PartitionStore<I, V, E, M> partitionStore) {
- // No modification necessary
- return workerPartitionStats;
- }
-
- @Override
- public PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E, M> partitionStore) {
- partitionOwnerList.clear();
- partitionOwnerList.addAll(masterSetPartitionOwners);
-
- Set<WorkerInfo> dependentWorkerSet = new HashSet<WorkerInfo>();
- Map<WorkerInfo, List<Integer>> workerPartitionOwnerMap =
- new HashMap<WorkerInfo, List<Integer>>();
- for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
- if (partitionOwner.getPreviousWorkerInfo() == null) {
- continue;
- } else if (partitionOwner.getWorkerInfo().equals(
- myWorkerInfo) &&
- partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- throw new IllegalStateException(
- "updatePartitionOwners: Impossible to have the same " +
- "previous and current worker info " + partitionOwner +
- " as me " + myWorkerInfo);
- } else if (partitionOwner.getWorkerInfo().equals(myWorkerInfo)) {
- dependentWorkerSet.add(partitionOwner.getPreviousWorkerInfo());
- } else if (partitionOwner.getPreviousWorkerInfo().equals(
- myWorkerInfo)) {
- if (workerPartitionOwnerMap.containsKey(
- partitionOwner.getWorkerInfo())) {
- workerPartitionOwnerMap.get(
- partitionOwner.getWorkerInfo()).add(
- partitionOwner.getPartitionId());
- } else {
- List<Integer> tmpPartitionOwnerList = new ArrayList<Integer>();
- tmpPartitionOwnerList.add(partitionOwner.getPartitionId());
- workerPartitionOwnerMap.put(partitionOwner.getWorkerInfo(),
- tmpPartitionOwnerList);
- }
- }
- }
-
- return new PartitionExchange(dependentWorkerSet,
- workerPartitionOwnerMap);
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return partitionOwnerList;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
deleted file mode 100644
index e911303..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/MasterGraphPartitioner.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.Collection;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.giraph.graph.WorkerInfo;
-
-/**
- * Determines how to divide the graph into partitions, how to manipulate
- * partitions and then how to assign those partitions to workers.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public interface MasterGraphPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Set some initial partition owners for the graph. Guaranteed to be called
- * prior to the graph being loaded (initial or restart).
- *
- * @param availableWorkerInfos Workers available for partition assignment
- * @param maxWorkers Maximum number of workers
- * @return Collection of generated partition owners.
- */
- Collection<PartitionOwner> createInitialPartitionOwners(
- Collection<WorkerInfo> availableWorkerInfos, int maxWorkers);
-
- /**
- * After the worker stats have been merged to a single list, the master can
- * use this information to send commands to the workers for any
- * {@link Partition} changes. This protocol is specific to the
- * {@link MasterGraphPartitioner} implementation.
- *
- * @param allPartitionStatsList All partition stats from all workers.
- * @param availableWorkers Workers available for partition assignment
- * @param maxWorkers Maximum number of workers
- * @param superstep Partition owners will be set for this superstep
- * @return Collection of {@link PartitionOwner} objects that changed from
- * the previous superstep, empty list if no change.
- */
- Collection<PartitionOwner> generateChangedPartitionOwners(
- Collection<PartitionStats> allPartitionStatsList,
- Collection<WorkerInfo> availableWorkers,
- int maxWorkers,
- long superstep);
-
- /**
- * Get current partition owners at this time.
- *
- * @return Collection of current {@link PartitionOwner} objects
- */
- Collection<PartitionOwner> getCurrentPartitionOwners();
-
- /**
- * Instantiate the {@link PartitionStats} implementation used to read the
- * worker stats
- *
- * @return Instantiated {@link PartitionStats} object
- */
- PartitionStats createPartitionStats();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
deleted file mode 100644
index b0f156f..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/Partition.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import org.apache.hadoop.util.Progressable;
-
-/**
- * A generic container that stores vertices. Vertex ids will map to exactly
- * one partition.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface Partition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends Writable, ImmutableClassesGiraphConfigurable<I, V, E, M>,
- Iterable<Vertex<I, V, E, M>> {
- /**
- * Initialize the partition. Guaranteed to be called before used.
- *
- * @param partitionId Partition id
- * @param progressable Progressable to call progress
- */
- void initialize(int partitionId, Progressable progressable);
-
- /**
- * Get the vertex for this vertex index.
- *
- * @param vertexIndex Vertex index to search for
- * @return Vertex if it exists, null otherwise
- */
- Vertex<I, V, E, M> getVertex(I vertexIndex);
-
- /**
- * Put a vertex into the Partition
- *
- * @param vertex Vertex to put in the Partition
- * @return old vertex value (i.e. null if none existed prior)
- */
- Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex);
-
- /**
- * Remove a vertex from the Partition
- *
- * @param vertexIndex Vertex index to remove
- * @return The removed vertex.
- */
- Vertex<I, V, E, M> removeVertex(I vertexIndex);
-
- /**
- * Add a partition's vertices
- *
- * @param partition Partition to add
- */
- void addPartition(Partition<I, V, E, M> partition);
-
- /**
- * Get the number of vertices in this partition
- *
- * @return Number of vertices
- */
- long getVertexCount();
-
- /**
- * Get the number of edges in this partition.
- *
- * @return Number of edges.
- */
- long getEdgeCount();
-
- /**
- * Get the partition id.
- *
- * @return Id of this partition.
- */
- int getId();
-
- /**
- * Set the partition id.
- *
- * @param id Id of this partition
- */
- void setId(int id);
-
- /**
- * Set the context.
- *
- * @param progressable Progressable
- */
- void setProgressable(Progressable progressable);
-
- /**
- * Save potentially modified vertex back to the partition.
- *
- * @param vertex Vertex to save
- */
- void saveVertex(Vertex<I, V, E, M> vertex);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
deleted file mode 100644
index 2d1c2a2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionBalancer.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.log4j.Logger;
-
-/**
- * Helper class for balancing partitions across a set of workers.
- */
-public class PartitionBalancer {
- /** Partition balancing algorithm */
- public static final String PARTITION_BALANCE_ALGORITHM =
- "hash.partitionBalanceAlgorithm";
- /** No rebalancing during the supersteps */
- public static final String STATIC_BALANCE_ALGORITHM =
- "static";
- /** Rebalance across supersteps by edges */
- public static final String EGDE_BALANCE_ALGORITHM =
- "edges";
- /** Rebalance across supersteps by vertices */
- public static final String VERTICES_BALANCE_ALGORITHM =
- "vertices";
- /** Class logger */
- private static Logger LOG = Logger.getLogger(PartitionBalancer.class);
-
- /**
- * What value to balance partitions with? Edges, vertices?
- */
- private enum BalanceValue {
- /** Not chosen */
- UNSET,
- /** Balance with edges */
- EDGES,
- /** Balance with vertices */
- VERTICES
- }
-
- /**
- * Do not construct this class.
- */
- private PartitionBalancer() { }
-
- /**
- * Get the value used to balance.
- *
- * @param partitionStat Stats of this partition.
- * @param balanceValue Type of the value to balance.
- * @return Balance value.
- */
- private static long getBalanceValue(PartitionStats partitionStat,
- BalanceValue balanceValue) {
- switch (balanceValue) {
- case EDGES:
- return partitionStat.getEdgeCount();
- case VERTICES:
- return partitionStat.getVertexCount();
- default:
- throw new IllegalArgumentException(
- "getBalanceValue: Illegal balance value " + balanceValue);
- }
- }
-
- /**
- * Used to sort the partition owners from lowest value to highest value
- */
- private static class PartitionOwnerComparator implements
- Comparator<PartitionOwner> {
- /** Map of owner to stats */
- private final Map<PartitionOwner, PartitionStats> ownerStatMap;
- /** Value type to compare on */
- private final BalanceValue balanceValue;
-
-
- /**
- * Only constructor.
- *
- * @param ownerStatMap Map of owners to stats.
- * @param balanceValue Value to balance with.
- */
- public PartitionOwnerComparator(
- Map<PartitionOwner, PartitionStats> ownerStatMap,
- BalanceValue balanceValue) {
- this.ownerStatMap = ownerStatMap;
- this.balanceValue = balanceValue;
- }
-
- @Override
- public int compare(PartitionOwner owner1, PartitionOwner owner2) {
- return (int)
- (getBalanceValue(ownerStatMap.get(owner1), balanceValue) -
- getBalanceValue(ownerStatMap.get(owner2), balanceValue));
- }
- }
-
- /**
- * Structure to keep track of how much value a {@link WorkerInfo} has
- * been assigned.
- */
- private static class WorkerInfoAssignments implements
- Comparable<WorkerInfoAssignments> {
- /** Worker info associated */
- private final WorkerInfo workerInfo;
- /** Balance value */
- private final BalanceValue balanceValue;
- /** Map of owner to stats */
- private final Map<PartitionOwner, PartitionStats> ownerStatsMap;
- /** Current value of this object */
- private long value = 0;
-
- /**
- * Constructor with final values.
- *
- * @param workerInfo Worker info for assignment.
- * @param balanceValue Value used to balance.
- * @param ownerStatsMap Map of owner to stats.
- */
- public WorkerInfoAssignments(
- WorkerInfo workerInfo,
- BalanceValue balanceValue,
- Map<PartitionOwner, PartitionStats> ownerStatsMap) {
- this.workerInfo = workerInfo;
- this.balanceValue = balanceValue;
- this.ownerStatsMap = ownerStatsMap;
- }
-
- /**
- * Get the total value of all partitions assigned to this worker.
- *
- * @return Total value of all partition assignments.
- */
- public long getValue() {
- return value;
- }
-
- /**
- * Assign a {@link PartitionOwner} to this {@link WorkerInfo}.
- *
- * @param partitionOwner PartitionOwner to assign.
- */
- public void assignPartitionOwner(
- PartitionOwner partitionOwner) {
- value += getBalanceValue(ownerStatsMap.get(partitionOwner),
- balanceValue);
- if (!partitionOwner.getWorkerInfo().equals(workerInfo)) {
- partitionOwner.setPreviousWorkerInfo(
- partitionOwner.getWorkerInfo());
- partitionOwner.setWorkerInfo(workerInfo);
- } else {
- partitionOwner.setPreviousWorkerInfo(null);
- }
- }
-
- @Override
- public int compareTo(WorkerInfoAssignments other) {
- return (int)
- (getValue() - ((WorkerInfoAssignments) other).getValue());
- }
- }
-
- /**
- * Balance the partitions with an algorithm based on a value.
- *
- * @param conf Configuration to find the algorithm
- * @param partitionOwners All the owners of all partitions
- * @param allPartitionStats All the partition stats
- * @param availableWorkerInfos All the available workers
- * @return Balanced partition owners
- */
- public static Collection<PartitionOwner> balancePartitionsAcrossWorkers(
- Configuration conf,
- Collection<PartitionOwner> partitionOwners,
- Collection<PartitionStats> allPartitionStats,
- Collection<WorkerInfo> availableWorkerInfos) {
-
- String balanceAlgorithm =
- conf.get(PARTITION_BALANCE_ALGORITHM, STATIC_BALANCE_ALGORITHM);
- if (LOG.isInfoEnabled()) {
- LOG.info("balancePartitionsAcrossWorkers: Using algorithm " +
- balanceAlgorithm);
- }
- BalanceValue balanceValue = BalanceValue.UNSET;
- if (balanceAlgorithm.equals(STATIC_BALANCE_ALGORITHM)) {
- return partitionOwners;
- } else if (balanceAlgorithm.equals(EGDE_BALANCE_ALGORITHM)) {
- balanceValue = BalanceValue.EDGES;
- } else if (balanceAlgorithm.equals(VERTICES_BALANCE_ALGORITHM)) {
- balanceValue = BalanceValue.VERTICES;
- } else {
- throw new IllegalArgumentException(
- "balancePartitionsAcrossWorkers: Illegal balance " +
- "algorithm - " + balanceAlgorithm);
- }
-
- // Join the partition stats and partition owners by partition id
- Map<Integer, PartitionStats> idStatMap =
- new HashMap<Integer, PartitionStats>();
- for (PartitionStats partitionStats : allPartitionStats) {
- if (idStatMap.put(partitionStats.getPartitionId(), partitionStats) !=
- null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Duplicate partition id " +
- "for " + partitionStats);
- }
- }
- Map<PartitionOwner, PartitionStats> ownerStatsMap =
- new HashMap<PartitionOwner, PartitionStats>();
- for (PartitionOwner partitionOwner : partitionOwners) {
- PartitionStats partitionStats =
- idStatMap.get(partitionOwner.getPartitionId());
- if (partitionStats == null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Missing partition " +
- "stats for " + partitionOwner);
- }
- if (ownerStatsMap.put(partitionOwner, partitionStats) != null) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: Duplicate partition " +
- "owner " + partitionOwner);
- }
- }
- if (ownerStatsMap.size() != partitionOwners.size()) {
- throw new IllegalStateException(
- "balancePartitionsAcrossWorkers: ownerStats count = " +
- ownerStatsMap.size() + ", partitionOwners count = " +
- partitionOwners.size() + " and should match.");
- }
-
- List<WorkerInfoAssignments> workerInfoAssignmentsList =
- new ArrayList<WorkerInfoAssignments>(availableWorkerInfos.size());
- for (WorkerInfo workerInfo : availableWorkerInfos) {
- workerInfoAssignmentsList.add(
- new WorkerInfoAssignments(
- workerInfo, balanceValue, ownerStatsMap));
- }
-
- // A simple heuristic for balancing the partitions across the workers
- // using a value (edges, vertices). An improvement would be to
- // take into account the already existing partition worker assignments.
- // 1. Sort the partitions by size
- // 2. Place the workers in a min heap sorted by their total balance
- // value.
- // 3. From largest partition to the smallest, take the partition
- // worker at the top of the heap, add the partition to it, and
- // then put it back in the heap
- List<PartitionOwner> partitionOwnerList =
- new ArrayList<PartitionOwner>(partitionOwners);
- Collections.sort(partitionOwnerList,
- Collections.reverseOrder(
- new PartitionOwnerComparator(ownerStatsMap, balanceValue)));
- PriorityQueue<WorkerInfoAssignments> minQueue =
- new PriorityQueue<WorkerInfoAssignments>(workerInfoAssignmentsList);
- for (PartitionOwner partitionOwner : partitionOwnerList) {
- WorkerInfoAssignments chosenWorker = minQueue.remove();
- chosenWorker.assignPartitionOwner(partitionOwner);
- minQueue.add(chosenWorker);
- }
-
- return partitionOwnerList;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
deleted file mode 100644
index 1b2be9a..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionExchange.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.giraph.graph.WorkerInfo;
-
-/**
- * Describes what is required to send and wait for in a potential partition
- * exchange between workers.
- */
-public class PartitionExchange {
- /** Workers that I am dependent on before I can continue */
- private final Set<WorkerInfo> myDependencyWorkerSet;
- /** Workers that I need to sent partitions to */
- private final Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap;
-
- /**
- * Only constructor.
- *
- * @param myDependencyWorkerSet All the workers I must wait for
- * @param sendWorkerPartitionMap Partitions I need to send to other workers
- */
- public PartitionExchange(
- Set<WorkerInfo> myDependencyWorkerSet,
- Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap) {
- this.myDependencyWorkerSet = myDependencyWorkerSet;
- this.sendWorkerPartitionMap = sendWorkerPartitionMap;
- }
-
- /**
- * Get the workers that I must wait for
- *
- * @return Set of workers I must wait for
- */
- public Set<WorkerInfo> getMyDependencyWorkerSet() {
- return myDependencyWorkerSet;
- }
-
- /**
- * Get a mapping of worker to list of partition ids I need to send to.
- *
- * @return Mapping of worker to partition id list I will send to.
- */
- public Map<WorkerInfo, List<Integer>> getSendWorkerPartitionMap() {
- return sendWorkerPartitionMap;
- }
-
- /**
- * Is this worker involved in a partition exchange? Receiving or sending?
- *
- * @return True if needs to be involved in the exchange, false otherwise.
- */
- public boolean doExchange() {
- return !myDependencyWorkerSet.isEmpty() ||
- !sendWorkerPartitionMap.isEmpty();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
deleted file mode 100644
index 5616a8d..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionOwner.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-
-/**
- * Metadata about ownership of a partition.
- */
-public interface PartitionOwner extends Writable {
- /**
- * Get the partition id that maps to the relevant {@link Partition} object
- *
- * @return Partition id
- */
- int getPartitionId();
-
- /**
- * Get the worker information that is currently responsible for
- * the partition id.
- *
- * @return Owning worker information.
- */
- WorkerInfo getWorkerInfo();
-
- /**
- * Set the current worker info.
- *
- * @param workerInfo Worker info responsible for partition
- */
- void setWorkerInfo(WorkerInfo workerInfo);
-
- /**
- * Get the worker information that was previously responsible for the
- * partition id.
- *
- * @return Owning worker information or null if no previous worker info.
- */
- WorkerInfo getPreviousWorkerInfo();
-
- /**
- * Set the previous worker info.
- *
- * @param workerInfo Worker info that was previously responsible for the
- * partition.
- */
- void setPreviousWorkerInfo(WorkerInfo workerInfo);
-
- /**
- * If this is a restarted checkpoint, the worker will use this information
- * to determine where the checkpointed partition was stored on HDFS.
- *
- * @return Prefix of the checkpoint HDFS files for this partition, null if
- * this is not a restarted superstep.
- */
- String getCheckpointFilesPrefix();
-
- /**
- * Set the checkpoint files prefix. Master uses this.
- *
- * @param checkpointFilesPrefix HDFS checkpoint file prefix
- */
- void setCheckpointFilesPrefix(String checkpointFilesPrefix);
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
deleted file mode 100644
index 6ee0228..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStats.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-
-/**
- * Used to keep track of statistics of every {@link Partition}. Contains no
- * actual partition data, only the statistics.
- */
-public class PartitionStats implements Writable {
- /** Id of partition to keep stats for */
- private int partitionId = -1;
- /** Vertices in this partition */
- private long vertexCount = 0;
- /** Finished vertices in this partition */
- private long finishedVertexCount = 0;
- /** Edges in this partition */
- private long edgeCount = 0;
- /** Messages sent from this partition */
- private long messagesSentCount = 0;
-
- /**
- * Default constructor for reflection.
- */
- public PartitionStats() { }
-
- /**
- * Constructor with the initial stats.
- *
- * @param partitionId Partition count.
- * @param vertexCount Vertex count.
- * @param finishedVertexCount Finished vertex count.
- * @param edgeCount Edge count.
- * @param messagesSentCount Number of messages sent
- */
- public PartitionStats(int partitionId,
- long vertexCount,
- long finishedVertexCount,
- long edgeCount,
- long messagesSentCount) {
- this.partitionId = partitionId;
- this.vertexCount = vertexCount;
- this.finishedVertexCount = finishedVertexCount;
- this.edgeCount = edgeCount;
- this.messagesSentCount = messagesSentCount;
- }
-
- /**
- * Set the partition id.
- *
- * @param partitionId New partition id.
- */
- public void setPartitionId(int partitionId) {
- this.partitionId = partitionId;
- }
-
- /**
- * Get partition id.
- *
- * @return Partition id.
- */
- public int getPartitionId() {
- return partitionId;
- }
-
- /**
- * Increment the vertex count by one.
- */
- public void incrVertexCount() {
- ++vertexCount;
- }
-
- /**
- * Get the vertex count.
- *
- * @return Vertex count.
- */
- public long getVertexCount() {
- return vertexCount;
- }
-
- /**
- * Increment the finished vertex count by one.
- */
- public void incrFinishedVertexCount() {
- ++finishedVertexCount;
- }
-
- /**
- * Get the finished vertex count.
- *
- * @return Finished vertex count.
- */
- public long getFinishedVertexCount() {
- return finishedVertexCount;
- }
-
- /**
- * Add edges to the edge count.
- *
- * @param edgeCount Number of edges to add.
- */
- public void addEdgeCount(long edgeCount) {
- this.edgeCount += edgeCount;
- }
-
- /**
- * Get the edge count.
- *
- * @return Edge count.
- */
- public long getEdgeCount() {
- return edgeCount;
- }
-
- /**
- * Add messages to the messages sent count.
- *
- * @param messagesSentCount Number of messages to add.
- */
- public void addMessagesSentCount(long messagesSentCount) {
- this.messagesSentCount += messagesSentCount;
- }
-
- /**
- * Get the messages sent count.
- *
- * @return Messages sent count.
- */
- public long getMessagesSentCount() {
- return messagesSentCount;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- partitionId = input.readInt();
- vertexCount = input.readLong();
- finishedVertexCount = input.readLong();
- edgeCount = input.readLong();
- messagesSentCount = input.readLong();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(partitionId);
- output.writeLong(vertexCount);
- output.writeLong(finishedVertexCount);
- output.writeLong(edgeCount);
- output.writeLong(messagesSentCount);
- }
-
- @Override
- public String toString() {
- return "(id=" + partitionId + ",vtx=" + vertexCount + ",finVtx=" +
- finishedVertexCount + ",edges=" + edgeCount + ",msgsSent=" +
- messagesSentCount + ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
deleted file mode 100644
index 07f55ed..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionStore.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Structure that stores partitions for a worker.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public abstract class PartitionStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
-
- /**
- * Add a new partition to the store or just the vertices from the partition
- * to the old partition.
- *
- * @param partition Partition to add
- */
- public abstract void addPartition(Partition<I, V, E, M> partition);
-
- /**
- * Get a partition.
- *
- * @param partitionId Partition id
- * @return The requested partition
- */
- public abstract Partition<I, V, E, M> getPartition(Integer partitionId);
-
- /**
- * Remove a partition and return it.
- *
- * @param partitionId Partition id
- * @return The removed partition
- */
- public abstract Partition<I, V, E, M> removePartition(Integer partitionId);
-
- /**
- * Just delete a partition
- * (more efficient than {@link #removePartition(Integer partitionID)} if the
- * partition is out of core).
- *
- * @param partitionId Partition id
- */
- public abstract void deletePartition(Integer partitionId);
-
- /**
- * Whether a specific partition is present in the store.
- *
- * @param partitionId Partition id
- * @return True iff the partition is present
- */
- public abstract boolean hasPartition(Integer partitionId);
-
- /**
- * Return the ids of all the stored partitions as an Iterable.
- *
- * @return The partition ids
- */
- public abstract Iterable<Integer> getPartitionIds();
-
- /**
- * Return the number of stored partitions.
- *
- * @return The number of partitions
- */
- public abstract int getNumPartitions();
-
- /**
- * Whether the partition store is empty.
- *
- * @return True iff there are no partitions in the store
- */
- public boolean isEmpty() {
- return getNumPartitions() == 0;
- }
-
- /**
- * Return all the stored partitions as an Iterable. Note that this may force
- * out-of-core partitions to be loaded into memory if using out-of-core.
- *
- * @return The partition objects
- */
- public Iterable<Partition<I, V, E, M>> getPartitions() {
- return Iterables.transform(getPartitionIds(),
- new Function<Integer, Partition<I, V, E, M>>() {
- @Override
- public Partition<I, V, E, M> apply(Integer partitionId) {
- return getPartition(partitionId);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
deleted file mode 100644
index 5600dad..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/PartitionUtils.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.giraph.graph.VertexEdgeCount;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Helper class for {@link Partition} related operations.
- */
-public class PartitionUtils {
- /** Class logger */
- private static Logger LOG = Logger.getLogger(PartitionUtils.class);
-
- /**
- * Do not construct this object.
- */
- private PartitionUtils() { }
-
- /**
- * Compare edge counts for Entry<WorkerInfo, VertexEdgeCount> objects.
- */
- private static class EdgeCountComparator implements
- Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
- @Override
- public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
- Entry<WorkerInfo, VertexEdgeCount> worker2) {
- return (int) (worker1.getValue().getEdgeCount() -
- worker2.getValue().getEdgeCount());
- }
- }
-
- /**
- * Compare vertex counts between a {@link WorkerInfo} and
- * {@link VertexEdgeCount}.
- */
- private static class VertexCountComparator implements
- Comparator<Entry<WorkerInfo, VertexEdgeCount>> {
- @Override
- public int compare(Entry<WorkerInfo, VertexEdgeCount> worker1,
- Entry<WorkerInfo, VertexEdgeCount> worker2) {
- return (int) (worker1.getValue().getVertexCount() -
- worker2.getValue().getVertexCount());
- }
- }
-
- /**
- * Check for imbalances on a per worker basis, by calculating the
- * mean, high and low workers by edges and vertices.
- *
- * @param partitionOwnerList List of partition owners.
- * @param allPartitionStats All the partition stats.
- */
- public static void analyzePartitionStats(
- Collection<PartitionOwner> partitionOwnerList,
- List<PartitionStats> allPartitionStats) {
- Map<Integer, PartitionOwner> idOwnerMap =
- new HashMap<Integer, PartitionOwner>();
- for (PartitionOwner partitionOwner : partitionOwnerList) {
- if (idOwnerMap.put(partitionOwner.getPartitionId(),
- partitionOwner) != null) {
- throw new IllegalStateException(
- "analyzePartitionStats: Duplicate partition " +
- partitionOwner);
- }
- }
-
- Map<WorkerInfo, VertexEdgeCount> workerStatsMap = Maps.newHashMap();
- VertexEdgeCount totalVertexEdgeCount = new VertexEdgeCount();
- for (PartitionStats partitionStats : allPartitionStats) {
- WorkerInfo workerInfo =
- idOwnerMap.get(partitionStats.getPartitionId()).getWorkerInfo();
- VertexEdgeCount vertexEdgeCount =
- workerStatsMap.get(workerInfo);
- if (vertexEdgeCount == null) {
- workerStatsMap.put(
- workerInfo,
- new VertexEdgeCount(partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
- } else {
- workerStatsMap.put(
- workerInfo,
- vertexEdgeCount.incrVertexEdgeCount(
- partitionStats.getVertexCount(),
- partitionStats.getEdgeCount()));
- }
- totalVertexEdgeCount =
- totalVertexEdgeCount.incrVertexEdgeCount(
- partitionStats.getVertexCount(),
- partitionStats.getEdgeCount());
- }
-
- List<Entry<WorkerInfo, VertexEdgeCount>> workerEntryList =
- Lists.newArrayList(workerStatsMap.entrySet());
-
- if (LOG.isInfoEnabled()) {
- Collections.sort(workerEntryList, new VertexCountComparator());
- LOG.info("analyzePartitionStats: Vertices - Mean: " +
- (totalVertexEdgeCount.getVertexCount() /
- workerStatsMap.size()) +
- ", Min: " +
- workerEntryList.get(0).getKey() + " - " +
- workerEntryList.get(0).getValue().getVertexCount() +
- ", Max: " +
- workerEntryList.get(workerEntryList.size() - 1).getKey() +
- " - " +
- workerEntryList.get(workerEntryList.size() - 1).
- getValue().getVertexCount());
- Collections.sort(workerEntryList, new EdgeCountComparator());
- LOG.info("analyzePartitionStats: Edges - Mean: " +
- (totalVertexEdgeCount.getEdgeCount() /
- workerStatsMap.size()) +
- ", Min: " +
- workerEntryList.get(0).getKey() + " - " +
- workerEntryList.get(0).getValue().getEdgeCount() +
- ", Max: " +
- workerEntryList.get(workerEntryList.size() - 1).getKey() +
- " - " +
- workerEntryList.get(workerEntryList.size() - 1).
- getValue().getEdgeCount());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
deleted file mode 100644
index 8e417ec..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeMasterPartitioner.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Some functionality is provided, but this is meant for developers to
- * determine the partitioning based on the actual types of data. The
- * implementation of several methods are left to the developer who is trying
- * to control the amount of messages sent from one worker to another.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeMasterPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- MasterGraphPartitioner<I, V, E, M> {
- @Override
- public PartitionStats createPartitionStats() {
- return new RangePartitionStats<I>();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
deleted file mode 100644
index 4dfe1e2..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionOwner.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Added the max key index in to the {@link PartitionOwner}. Also can provide
- * a split hint if desired.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionOwner<I extends WritableComparable>
- extends BasicPartitionOwner {
- /** Max index for this partition */
- private I maxIndex;
-
- /**
- * Default constructor.
- */
- public RangePartitionOwner() { }
-
- /**
- * Constructor with the max index.
- *
- * @param maxIndex Max index of this partition.
- */
- public RangePartitionOwner(I maxIndex) {
- this.maxIndex = maxIndex;
- }
-
- /**
- * Get the maximum index of this partition owner.
- *
- * @return Maximum index.
- */
- public I getMaxIndex() {
- return maxIndex;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- maxIndex = (I) getConf().createVertexId();
- maxIndex.readFields(input);
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- maxIndex.write(output);
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
deleted file mode 100644
index 3ab43e8..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionStats.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Same as {@link PartitionStats}, but also includes the hint for range-based
- * partitioning.
- *
- * @param <I> Vertex index type
- */
-@SuppressWarnings("rawtypes")
-public class RangePartitionStats<I extends WritableComparable>
- extends PartitionStats {
- /** Can be null if no hint, otherwise a splitting hint */
- private RangeSplitHint<I> hint;
-
- /**
- * Get the range split hint (if any)
- *
- * @return Hint of how to split the range if desired, null otherwise
- */
- public RangeSplitHint<I> getRangeSplitHint() {
- return hint;
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- super.readFields(input);
- boolean hintExists = input.readBoolean();
- if (hintExists) {
- hint = new RangeSplitHint<I>();
- hint.readFields(input);
- } else {
- hint = null;
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- super.write(output);
- output.writeBoolean(hint != null);
- if (hint != null) {
- hint.write(output);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
deleted file mode 100644
index 5855c0e..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangePartitionerFactory.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type. This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent. The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed. Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * See {@link RangeWorkerPartitioner}
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangePartitionerFactory<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements GraphPartitionerFactory<I, V, E, M> {
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
deleted file mode 100644
index e415b9b..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeSplitHint.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfigurable;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Hint to the {@link RangeMasterPartitioner} about how a
- * {@link RangePartitionOwner} can be split.
- *
- * @param <I> Vertex index to split around
- */
-@SuppressWarnings("rawtypes")
-public class RangeSplitHint<I extends WritableComparable>
- implements Writable, ImmutableClassesGiraphConfigurable {
- /** Hinted split index */
- private I splitIndex;
- /** Number of vertices in this range before the split */
- private long preSplitVertexCount;
- /** Number of vertices in this range after the split */
- private long postSplitVertexCount;
- /** Configuration */
- private ImmutableClassesGiraphConfiguration<I, ?, ?, ?> conf;
-
- @Override
- public void readFields(DataInput input) throws IOException {
- splitIndex = conf.createVertexId();
- splitIndex.readFields(input);
- preSplitVertexCount = input.readLong();
- postSplitVertexCount = input.readLong();
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- splitIndex.write(output);
- output.writeLong(preSplitVertexCount);
- output.writeLong(postSplitVertexCount);
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration getConf() {
- return conf;
- }
-
- @Override
- public void setConf(ImmutableClassesGiraphConfiguration conf) {
- this.conf = conf;
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
deleted file mode 100644
index b963d86..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/RangeWorkerPartitioner.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import java.util.Collection;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-/**
- * Range partitioning will split the vertices by a key range based on a generic
- * type. This allows vertices that have some locality with the vertex ids
- * to reduce the amount of messages sent. The tradeoffs are that
- * range partitioning is more susceptible to hot spots if the keys
- * are not randomly distributed. Another negative is the user must implement
- * some of the functionality around how to split the key range.
- *
- * Note: This implementation is incomplete, the developer must implement the
- * various methods based on their index type.
- *
- * @param <I> Vertex index value
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message value
- */
-@SuppressWarnings("rawtypes")
-public abstract class RangeWorkerPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> implements
- WorkerGraphPartitioner<I, V, E, M> {
- /** Mapping of the vertex ids to the {@link PartitionOwner} */
- protected NavigableMap<I, RangePartitionOwner<I>> vertexRangeMap =
- new TreeMap<I, RangePartitionOwner<I>>();
-
- @Override
- public PartitionOwner createPartitionOwner() {
- return new RangePartitionOwner<I>();
- }
-
- @Override
- public PartitionOwner getPartitionOwner(I vertexId) {
- // Find the partition owner based on the maximum partition id.
- // If the vertex id exceeds any of the maximum partition ids, give
- // it to the last one
- if (vertexId == null) {
- throw new IllegalArgumentException(
- "getPartitionOwner: Illegal null vertex id");
- }
- I maxVertexIndex = vertexRangeMap.ceilingKey(vertexId);
- if (maxVertexIndex == null) {
- return vertexRangeMap.lastEntry().getValue();
- } else {
- return vertexRangeMap.get(vertexId);
- }
- }
-
- @Override
- public Collection<? extends PartitionOwner> getPartitionOwners() {
- return vertexRangeMap.values();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
deleted file mode 100644
index 0706660..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartition.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.GiraphConstants;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.Vertex;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
-
-import com.google.common.collect.Maps;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-
-/**
- * A simple map-based container that stores vertices. Vertex ids will map to
- * exactly one partition.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public class SimplePartition<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- implements Partition<I, V, E, M> {
- /** Configuration from the worker */
- private ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Partition id */
- private int id;
- /** Vertex map for this range (keyed by index) */
- private ConcurrentMap<I, Vertex<I, V, E, M>> vertexMap;
- /** Context used to report progress */
- private Progressable progressable;
-
- /**
- * Constructor for reflection.
- */
- public SimplePartition() { }
-
- @Override
- public void initialize(int partitionId, Progressable progressable) {
- setId(partitionId);
- setProgressable(progressable);
- if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
- vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
- } else {
- vertexMap = Maps.newConcurrentMap();
- }
- }
-
- @Override
- public Vertex<I, V, E, M> getVertex(I vertexIndex) {
- return vertexMap.get(vertexIndex);
- }
-
- @Override
- public Vertex<I, V, E, M> putVertex(Vertex<I, V, E, M> vertex) {
- return vertexMap.put(vertex.getId(), vertex);
- }
-
- @Override
- public Vertex<I, V, E, M> removeVertex(I vertexIndex) {
- return vertexMap.remove(vertexIndex);
- }
-
- @Override
- public void addPartition(Partition<I, V, E, M> partition) {
- for (Vertex<I, V, E , M> vertex : partition) {
- vertexMap.put(vertex.getId(), vertex);
- }
- }
-
- @Override
- public long getVertexCount() {
- return vertexMap.size();
- }
-
- @Override
- public long getEdgeCount() {
- long edges = 0;
- for (Vertex<I, V, E, M> vertex : vertexMap.values()) {
- edges += vertex.getNumEdges();
- }
- return edges;
- }
-
- @Override
- public int getId() {
- return id;
- }
-
- @Override
- public void setId(int id) {
- this.id = id;
- }
-
- @Override
- public void setProgressable(Progressable progressable) {
- this.progressable = progressable;
- }
-
- @Override
- public void saveVertex(Vertex<I, V, E, M> vertex) {
- // No-op, vertices are stored as Java objects in this partition
- }
-
- @Override
- public String toString() {
- return "(id=" + getId() + ",V=" + vertexMap.size() + ")";
- }
-
- @Override
- public void readFields(DataInput input) throws IOException {
- if (conf.getBoolean(GiraphConstants.USE_OUT_OF_CORE_MESSAGES,
- GiraphConstants.USE_OUT_OF_CORE_MESSAGES_DEFAULT)) {
- vertexMap = new ConcurrentSkipListMap<I, Vertex<I, V, E, M>>();
- } else {
- vertexMap = Maps.newConcurrentMap();
- }
- id = input.readInt();
- int vertices = input.readInt();
- for (int i = 0; i < vertices; ++i) {
- Vertex<I, V, E, M> vertex = conf.createVertex();
- if (progressable != null) {
- progressable.progress();
- }
- vertex.readFields(input);
- if (vertexMap.put(vertex.getId(), vertex) != null) {
- throw new IllegalStateException(
- "readFields: " + this +
- " already has same id " + vertex);
- }
- }
- }
-
- @Override
- public void write(DataOutput output) throws IOException {
- output.writeInt(id);
- output.writeInt(vertexMap.size());
- for (Vertex vertex : vertexMap.values()) {
- if (progressable != null) {
- progressable.progress();
- }
- vertex.write(output);
- }
- }
-
- @Override
- public void setConf(
- ImmutableClassesGiraphConfiguration<I, V, E, M> configuration) {
- this.conf = configuration;
- }
-
- @Override
- public ImmutableClassesGiraphConfiguration<I, V, E, M> getConf() {
- return conf;
- }
-
- @Override
- public Iterator<Vertex<I, V, E, M>> iterator() {
- return vertexMap.values().iterator();
- }
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
deleted file mode 100644
index 37f9cac..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/SimplePartitionStore.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Mapper;
-
-import com.google.common.collect.Maps;
-
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * A simple in-memory partition store.
- *
- * @param <I> Vertex id
- * @param <V> Vertex data
- * @param <E> Edge data
- * @param <M> Message data
- */
-public class SimplePartitionStore<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable>
- extends PartitionStore<I, V, E, M> {
- /** Map of stored partitions. */
- private final ConcurrentMap<Integer, Partition<I, V, E, M>> partitions =
- Maps.newConcurrentMap();
- /** Configuration. */
- private final ImmutableClassesGiraphConfiguration<I, V, E, M> conf;
- /** Context used to report progress */
- private final Mapper<?, ?, ?, ?>.Context context;
-
- /**
- * Constructor.
- *
- * @param conf Configuration
- * @param context Mapper context
- */
- public SimplePartitionStore(
- ImmutableClassesGiraphConfiguration<I, V, E, M> conf,
- Mapper<?, ?, ?, ?>.Context context) {
- this.conf = conf;
- this.context = context;
- }
-
- @Override
- public void addPartition(Partition<I, V, E, M> partition) {
- Partition<I, V, E, M> oldPartition = partitions.get(partition.getId());
- if (oldPartition == null) {
- oldPartition = partitions.putIfAbsent(partition.getId(), partition);
- if (oldPartition == null) {
- return;
- }
- }
- oldPartition.addPartition(partition);
- }
-
- @Override
- public Partition<I, V, E, M> getPartition(Integer partitionId) {
- return partitions.get(partitionId);
- }
-
- @Override
- public Partition<I, V, E, M> removePartition(Integer partitionId) {
- return partitions.remove(partitionId);
- }
-
- @Override
- public void deletePartition(Integer partitionId) {
- partitions.remove(partitionId);
- }
-
- @Override
- public boolean hasPartition(Integer partitionId) {
- return partitions.containsKey(partitionId);
- }
-
- @Override
- public Iterable<Integer> getPartitionIds() {
- return partitions.keySet();
- }
-
- @Override
- public int getNumPartitions() {
- return partitions.size();
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
deleted file mode 100644
index 2364cc1..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/WorkerGraphPartitioner.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.graph.partition;
-
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-
-import java.util.Collection;
-
-/**
- * Stores the {@link PartitionOwner} objects from the master and provides the
- * mapping of vertex to {@link PartitionOwner}. Also generates the partition
- * owner implementation.
- *
- * @param <I> Vertex id
- * @param <V> Vertex value
- * @param <E> Edge value
- * @param <M> Message data
- */
-@SuppressWarnings("rawtypes")
-public interface WorkerGraphPartitioner<I extends WritableComparable,
- V extends Writable, E extends Writable, M extends Writable> {
- /**
- * Instantiate the {@link PartitionOwner} implementation used to read the
- * master assignments.
- *
- * @return Instantiated {@link PartitionOwner} object
- */
- PartitionOwner createPartitionOwner();
-
- /**
- * Figure out the owner of a vertex
- *
- * @param vertexId Vertex id to get the partition for
- * @return Correct partition owner
- */
- PartitionOwner getPartitionOwner(I vertexId);
-
- /**
- * At the end of a superstep, workers have {@link PartitionStats} generated
- * for each of their partitions. This method will allow the user to
- * modify or create their own {@link PartitionStats} interfaces to send to
- * the master.
- *
- * @param workerPartitionStats Stats generated by the infrastructure during
- * the superstep
- * @param partitionStore Partition store for this worker
- * (could be used to provide more useful stat information)
- * @return Final partition stats
- */
- Collection<PartitionStats> finalizePartitionStats(
- Collection<PartitionStats> workerPartitionStats,
- PartitionStore<I, V, E, M> partitionStore);
-
- /**
- * Get the partitions owners and update locally. Returns the partitions
- * to send to other workers and other dependencies.
- *
- * @param myWorkerInfo Worker info.
- * @param masterSetPartitionOwners Master set partition owners, received
- * prior to beginning the superstep
- * @param partitionStore Partition store for this worker
- * (can be used to fill the return map of partitions to send)
- * @return Information for the partition exchange.
- */
- PartitionExchange updatePartitionOwners(
- WorkerInfo myWorkerInfo,
- Collection<? extends PartitionOwner> masterSetPartitionOwners,
- PartitionStore<I, V, E, M> partitionStore);
-
- /**
- * Get a collection of the {@link PartitionOwner} objects.
- *
- * @return Collection of owners for every partition.
- */
- Collection<? extends PartitionOwner> getPartitionOwners();
-}
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java b/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
deleted file mode 100644
index 4d6f6c3..0000000
--- a/giraph-core/src/main/java/org/apache/giraph/graph/partition/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Package of partitioning related objects.
- */
-package org.apache.giraph.graph.partition;
http://git-wip-us.apache.org/repos/asf/giraph/blob/1684891e/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
index 5a6c12d..c03d718 100644
--- a/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
+++ b/giraph-core/src/main/java/org/apache/giraph/integration/SuperstepHashPartitionerFactory.java
@@ -23,13 +23,13 @@ import java.util.Collection;
import java.util.List;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.graph.WorkerInfo;
-import org.apache.giraph.graph.partition.BasicPartitionOwner;
-import org.apache.giraph.graph.partition.HashMasterPartitioner;
-import org.apache.giraph.graph.partition.HashPartitionerFactory;
-import org.apache.giraph.graph.partition.MasterGraphPartitioner;
-import org.apache.giraph.graph.partition.PartitionOwner;
-import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.giraph.partition.BasicPartitionOwner;
+import org.apache.giraph.partition.HashMasterPartitioner;
+import org.apache.giraph.partition.HashPartitionerFactory;
+import org.apache.giraph.partition.MasterGraphPartitioner;
+import org.apache.giraph.partition.PartitionOwner;
+import org.apache.giraph.partition.PartitionStats;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.log4j.Logger;