You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ap...@apache.org on 2013/02/26 01:14:50 UTC

git commit: Add missing files from GIRAPH-535

Updated Branches:
  refs/heads/trunk 507959dcb -> c33ea10c4


Add missing files from GIRAPH-535


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/c33ea10c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/c33ea10c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/c33ea10c

Branch: refs/heads/trunk
Commit: c33ea10c40d0712352171b8f086dd4fcb70a43c3
Parents: 507959d
Author: Alessandro Presta <al...@fb.com>
Authored: Mon Feb 25 16:14:20 2013 -0800
Committer: Alessandro Presta <al...@fb.com>
Committed: Mon Feb 25 16:14:20 2013 -0800

----------------------------------------------------------------------
 .../formats/PseudoRandomInputFormatConstants.java  |   39 +++++
 .../io/formats/PseudoRandomLocalEdgesHelper.java   |   96 ++++++++++++
 .../SimpleIntRangePartitionerFactory.java          |   78 ++++++++++
 .../SimpleLongRangePartitionerFactory.java         |   78 ++++++++++
 .../partition/SimpleRangeMasterPartitioner.java    |  117 +++++++++++++++
 .../partition/SimpleRangeWorkerPartitioner.java    |  109 ++++++++++++++
 6 files changed, 517 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java
new file mode 100644
index 0000000..3497de4
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomInputFormatConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+/**
+ * Contains constants for configuring pseudo-random input formats.
+ */
+public class PseudoRandomInputFormatConstants {
+  /** Set the number of aggregate vertices. */
+  public static final String AGGREGATE_VERTICES =
+      "giraph.pseudoRandomInputFormat.aggregateVertices";
+  /** Set the number of edges per vertex (pseudo-random destination). */
+  public static final String EDGES_PER_VERTEX =
+      "giraph.pseudoRandomInputFormat.edgesPerVertex";
+  /** Minimum ratio of partition-local edges. */
+  public static final String LOCAL_EDGES_MIN_RATIO =
+      "giraph.pseudoRandomInputFormat.localEdgesMinRatio";
+  /** Default minimum ratio of partition-local edges. */
+  public static final float LOCAL_EDGES_MIN_RATIO_DEFAULT = 0;
+
+  /** Do not construct. */
+  private PseudoRandomInputFormatConstants() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
new file mode 100644
index 0000000..84502e1
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/io/formats/PseudoRandomLocalEdgesHelper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io.formats;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.partition.PartitionUtils;
+import org.apache.giraph.partition.SimpleLongRangePartitionerFactory;
+import org.apache.giraph.worker.WorkerInfo;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Helper class to generate pseudo-random local edges.
+ */
+public class PseudoRandomLocalEdgesHelper {
+  /** Minimum ratio of partition-local edges. */
+  private float minLocalEdgesRatio;
+  /** Whether we're using range-partitioning or hash-partitioning */
+  private boolean usingRangePartitioner;
+  /** Total number of vertices. */
+  private long numVertices;
+  /** Total number of partitions. */
+  private int numPartitions;
+  /** Average partition size. */
+  private long partitionSize;
+
+  /**
+   * Constructor.
+   *
+   * @param numVertices Total number of vertices.
+   * @param minLocalEdgesRatio Minimum ratio of local edges.
+   * @param conf Configuration.
+   */
+  public PseudoRandomLocalEdgesHelper(long numVertices,
+                                      float minLocalEdgesRatio,
+                                      ImmutableClassesGiraphConfiguration conf)
+  {
+    this.minLocalEdgesRatio = minLocalEdgesRatio;
+    this.numVertices = numVertices;
+    usingRangePartitioner =
+        SimpleLongRangePartitionerFactory.class.isAssignableFrom(
+            conf.getGraphPartitionerClass());
+    int numWorkers = conf.getMaxWorkers();
+    List<WorkerInfo> workerInfos = Collections.nCopies(numWorkers,
+        new WorkerInfo());
+    numPartitions = PartitionUtils.computePartitionCount(workerInfos,
+        numWorkers, conf);
+    partitionSize = numVertices / numPartitions;
+  }
+
+  /**
+   * Generate a destination vertex id for the given source vertex,
+   * using the desired configuration for edge locality and the provided
+   * pseudo-random generator.
+   *
+   * @param sourceVertexId Source vertex id.
+   * @param rand Pseudo-random generator.
+   * @return Destination vertex id.
+   */
+  public long generateDestVertex(long sourceVertexId, Random rand) {
+    long destVertexId;
+    if (rand.nextFloat() < minLocalEdgesRatio) {
+      if (usingRangePartitioner) {
+        int partitionId = Math.min(numPartitions - 1,
+            (int) (sourceVertexId / partitionSize));
+        destVertexId = partitionId * partitionSize +
+            (Math.abs(rand.nextLong()) % partitionSize);
+      } else {
+        int partitionId = (int) sourceVertexId % numPartitions;
+        destVertexId = partitionId +
+            numPartitions * (Math.abs(rand.nextLong()) % partitionSize);
+      }
+    } else {
+      destVertexId = Math.abs(rand.nextLong()) % numVertices;
+    }
+    return destVertexId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
new file mode 100644
index 0000000..9ac2e11
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleIntRangePartitionerFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Factory for simple range-based partitioners based on integer vertex ids.
+ * Workers are assigned equal-sized ranges of partitions,
+ * and partitions are assigned equal-sized ranges of vertices.
+ *
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message data type
+ */
+public class SimpleIntRangePartitionerFactory<V extends Writable,
+    E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<IntWritable, V, E, M> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Vertex key space size. */
+  private long keySpaceSize;
+
+  @Override
+  public MasterGraphPartitioner<IntWritable, V, E, M>
+  createMasterGraphPartitioner() {
+    return new SimpleRangeMasterPartitioner<IntWritable, V, E, M>(conf);
+  }
+
+  @Override
+  public WorkerGraphPartitioner<IntWritable, V, E, M>
+  createWorkerGraphPartitioner() {
+    return new SimpleRangeWorkerPartitioner<IntWritable, V, E, M>(
+        keySpaceSize) {
+      @Override
+      protected long vertexKeyFromId(IntWritable id) {
+        // The modulo is just a safeguard in case keySpaceSize is incorrect.
+        return id.get() % keySpaceSize;
+      }
+    };
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
+        -1);
+    if (keySpaceSize == -1) {
+      throw new IllegalStateException("Need to specify " + GiraphConstants
+          .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
+          "SimpleRangePartitioner");
+    }
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
new file mode 100644
index 0000000..5772a7b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleLongRangePartitionerFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Factory for simple range-based partitioners based on long vertex ids.
+ * Workers are assigned equal-sized ranges of partitions,
+ * and partitions are assigned equal-sized ranges of vertices.
+ *
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message data type
+ */
+public class SimpleLongRangePartitionerFactory<V extends Writable,
+    E extends Writable, M extends Writable>
+    implements GraphPartitionerFactory<LongWritable, V, E, M> {
+  /** Configuration. */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Vertex key space size. */
+  private long keySpaceSize;
+
+  @Override
+  public MasterGraphPartitioner<LongWritable, V, E, M>
+  createMasterGraphPartitioner() {
+    return new SimpleRangeMasterPartitioner<LongWritable, V, E, M>(conf);
+  }
+
+  @Override
+  public WorkerGraphPartitioner<LongWritable, V, E, M>
+  createWorkerGraphPartitioner() {
+    return new SimpleRangeWorkerPartitioner<LongWritable, V, E, M>(
+        keySpaceSize) {
+      @Override
+      protected long vertexKeyFromId(LongWritable id) {
+        // The modulo is just a safeguard in case keySpaceSize is incorrect.
+        return id.get() % keySpaceSize;
+      }
+    };
+  }
+
+  @Override
+  public void setConf(ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+    keySpaceSize = conf.getLong(GiraphConstants.PARTITION_VERTEX_KEY_SPACE_SIZE,
+        -1);
+    if (keySpaceSize == -1) {
+      throw new IllegalStateException("Need to specify " + GiraphConstants
+          .PARTITION_VERTEX_KEY_SPACE_SIZE + " when using " +
+          "SimpleRangePartitioner");
+    }
+  }
+
+  @Override
+  public ImmutableClassesGiraphConfiguration getConf() {
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
new file mode 100644
index 0000000..bf34ecd
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeMasterPartitioner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A range-based master partitioner where equal-sized ranges of partitions
+ * are deterministically assigned to workers.
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message data type
+ */
+public class SimpleRangeMasterPartitioner<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable> implements
+    MasterGraphPartitioner<I, V, E, M> {
+  /** Class logger */
+  private static Logger LOG = Logger.getLogger(HashMasterPartitioner.class);
+  /** Provided configuration */
+  private ImmutableClassesGiraphConfiguration conf;
+  /** Save the last generated partition owner list */
+  private List<PartitionOwner> partitionOwnerList;
+
+  /**
+   * Constructor.
+   *
+   * @param conf Configuration used.
+   */
+  public SimpleRangeMasterPartitioner(
+      ImmutableClassesGiraphConfiguration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Collection<PartitionOwner> createInitialPartitionOwners(
+      Collection<WorkerInfo> availableWorkerInfos, int maxWorkers) {
+    int partitionCount = PartitionUtils.computePartitionCount(
+        availableWorkerInfos, maxWorkers, conf);
+    int rangeSize = partitionCount / availableWorkerInfos.size();
+
+    partitionOwnerList = new ArrayList<PartitionOwner>();
+    Iterator<WorkerInfo> workerIt = availableWorkerInfos.iterator();
+    WorkerInfo currentWorker = null;
+
+    int i = 0;
+    for (; i < partitionCount; ++i) {
+      if (i % rangeSize == 0) {
+        if (!workerIt.hasNext()) {
+          break;
+        }
+        currentWorker = workerIt.next();
+      }
+      partitionOwnerList.add(new BasicPartitionOwner(i, currentWorker));
+    }
+
+    // Distribute the remainder among all workers.
+    if (i < partitionCount) {
+      workerIt = availableWorkerInfos.iterator();
+      for (; i < partitionCount; ++i) {
+        partitionOwnerList.add(new BasicPartitionOwner(i, workerIt.next()));
+      }
+    }
+
+    return partitionOwnerList;
+  }
+
+  @Override
+  public Collection<PartitionOwner> generateChangedPartitionOwners(
+      Collection<PartitionStats> allPartitionStatsList,
+      Collection<WorkerInfo> availableWorkers,
+      int maxWorkers,
+      long superstep) {
+    return PartitionBalancer.balancePartitionsAcrossWorkers(
+        conf,
+        partitionOwnerList,
+        allPartitionStatsList,
+        availableWorkers);
+  }
+
+  @Override
+  public Collection<PartitionOwner> getCurrentPartitionOwners() {
+    return partitionOwnerList;
+  }
+
+  @Override
+  public PartitionStats createPartitionStats() {
+    return new PartitionStats();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c33ea10c/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
new file mode 100644
index 0000000..f94c14b
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/SimpleRangeWorkerPartitioner.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.partition;
+
+import com.google.common.collect.Lists;
+import org.apache.giraph.worker.WorkerInfo;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * A range-based worker partitioner where equal-sized ranges of vertex ids
+ * are deterministically assigned to partitions.
+ * The user has to define a mapping from vertex ids to long keys dense in
+ * [0, keySpaceSize).
+ *
+ * @param <I> Vertex id type
+ * @param <V> Vertex value type
+ * @param <E> Edge value type
+ * @param <M> Message data type
+ */
+public abstract class SimpleRangeWorkerPartitioner<I extends
+    WritableComparable, V extends Writable, E extends Writable,
+    M extends Writable>
+    implements WorkerGraphPartitioner<I, V, E, M> {
+  /** List of {@link PartitionOwner}s for this worker. */
+  private List<PartitionOwner> partitionOwnerList = Lists.newArrayList();
+  /** Vertex keys space size. */
+  private long keySpaceSize;
+
+  /**
+   * Constructor.
+   *
+   * @param keySpaceSize Vertex keys space size.
+   */
+  public SimpleRangeWorkerPartitioner(long keySpaceSize) {
+    this.keySpaceSize = keySpaceSize;
+  }
+
+  /**
+   * Get key space size (can be used when implementing vertexKeyFromId()).
+   *
+   * @return Key space size.
+   */
+  public long getKeySpaceSize() {
+    return keySpaceSize;
+  }
+
+  /**
+   * Convert a vertex id to a unique long key in [0, keySpaceSize].
+   *
+   * @param id Vertex id
+   * @return Unique long key
+   */
+  protected abstract long vertexKeyFromId(I id);
+
+  @Override
+  public PartitionOwner createPartitionOwner() {
+    return new BasicPartitionOwner();
+  }
+
+  @Override
+  public PartitionOwner getPartitionOwner(I vertexId) {
+    long rangeSize = keySpaceSize / partitionOwnerList.size();
+    return partitionOwnerList.get(
+        Math.min((int) (vertexKeyFromId(vertexId) / rangeSize),
+            partitionOwnerList.size() - 1));
+  }
+
+  @Override
+  public Collection<PartitionStats> finalizePartitionStats(
+      Collection<PartitionStats> workerPartitionStats,
+      PartitionStore<I, V, E, M> partitionStore) {
+    // No modification necessary
+    return workerPartitionStats;
+  }
+
+  @Override
+  public PartitionExchange updatePartitionOwners(
+      WorkerInfo myWorkerInfo,
+      Collection<? extends PartitionOwner> masterSetPartitionOwners,
+      PartitionStore<I, V, E, M> partitionStore) {
+    return PartitionBalancer.updatePartitionOwners(partitionOwnerList,
+        myWorkerInfo, masterSetPartitionOwners, partitionStore);
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> getPartitionOwners() {
+    return partitionOwnerList;
+  }
+}