You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@giraph.apache.org by ma...@apache.org on 2016/07/09 00:06:33 UTC

git commit: updated refs/heads/trunk to 28cbe03

Repository: giraph
Updated Branches:
  refs/heads/trunk 819f293f4 -> 28cbe037c


GIRAPH-1082: Remove limit on the number of partitions

Summary: Currently we have a limit on how many partitions we can have because we write all partition information to Zookeeper. We can instead send this information in requests and remove the hard limit.

Test Plan: Ran pagerank for 100 iterations with 500k partitions.

Differential Revision: https://reviews.facebook.net/D60267


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/28cbe037
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/28cbe037
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/28cbe037

Branch: refs/heads/trunk
Commit: 28cbe037cf9299ed6a089cc78039d0a16d0116ce
Parents: 819f293
Author: Maja Kabiljo <ma...@fb.com>
Authored: Fri Jul 1 07:39:25 2016 -0700
Committer: Maja Kabiljo <ma...@fb.com>
Committed: Thu Jul 7 15:45:33 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/giraph/bsp/BspService.java  | 36 ---------
 .../giraph/bsp/CentralizedServiceWorker.java    |  9 +++
 .../requests/AddressesAndPartitionsRequest.java | 72 +++++++++++++++++
 .../comm/requests/PartitionStatsRequest.java    | 80 +++++++++++++++++++
 .../giraph/comm/requests/RequestType.java       |  6 +-
 .../graph/AddressesAndPartitionsWritable.java   | 39 ++++------
 .../apache/giraph/master/BspServiceMaster.java  | 38 ++++-----
 .../giraph/master/MasterGlobalCommHandler.java  | 32 ++++++++
 .../apache/giraph/partition/PartitionUtils.java | 43 -----------
 .../giraph/utils/BlockingElementsSet.java       | 81 ++++++++++++++++++++
 .../apache/giraph/utils/ProgressableUtils.java  | 58 ++++++++++++++
 .../apache/giraph/worker/BspServiceWorker.java  | 49 ++++--------
 12 files changed, 384 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 9545a25..a2caf81 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -98,9 +98,6 @@ public abstract class BspService<I extends WritableComparable,
       "/_workerWroteCheckpointDir";
   /** Finished workers notify here */
   public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir";
-  /** Where the master and worker addresses and partition assignments are set */
-  public static final String ADDRESSES_AND_PARTITIONS_DIR =
-      "/_addressesAndPartitions";
   /** Helps coordinate the partition exchnages */
   public static final String PARTITION_EXCHANGE_DIR =
       "/_partitionExchangeDir";
@@ -114,9 +111,6 @@ public abstract class BspService<I extends WritableComparable,
   public static final String FORCE_CHECKPOINT_USER_FLAG = "/_checkpointAndStop";
   /** Denotes which workers have been cleaned up */
   public static final String CLEANED_UP_DIR = "/_cleanedUpDir";
-  /** JSON partition stats key */
-  public static final String JSONOBJ_PARTITION_STATS_KEY =
-      "_partitionStatsKey";
   /** JSON message count key */
   public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey";
   /** JSON message bytes count key */
@@ -167,8 +161,6 @@ public abstract class BspService<I extends WritableComparable,
   private final BspEvent connectedEvent;
   /** Has worker registration changed (either healthy or unhealthy) */
   private final BspEvent workerHealthRegistrationChanged;
-  /** Are the addresses and partition assignments to workers ready? */
-  private final BspEvent addressesAndPartitionsReadyChanged;
   /** Application attempt changed */
   private final BspEvent applicationAttemptChanged;
   /** Input splits worker done */
@@ -220,7 +212,6 @@ public abstract class BspService<I extends WritableComparable,
       GraphTaskManager<I, V, E> graphTaskManager) {
     this.connectedEvent = new PredicateLock(context);
     this.workerHealthRegistrationChanged = new PredicateLock(context);
-    this.addressesAndPartitionsReadyChanged = new PredicateLock(context);
     this.applicationAttemptChanged = new PredicateLock(context);
     this.inputSplitsWorkerDoneEvent = new PredicateLock(context);
     this.inputSplitsAllDoneEvent = new PredicateLock(context);
@@ -232,7 +223,6 @@ public abstract class BspService<I extends WritableComparable,
     registerBspEvent(workerHealthRegistrationChanged);
     registerBspEvent(inputSplitsWorkerDoneEvent);
     registerBspEvent(inputSplitsAllDoneEvent);
-    registerBspEvent(addressesAndPartitionsReadyChanged);
     registerBspEvent(applicationAttemptChanged);
     registerBspEvent(superstepFinished);
     registerBspEvent(masterElectionChildrenChanged);
@@ -427,19 +417,6 @@ public abstract class BspService<I extends WritableComparable,
   }
 
   /**
-   * Generate the "addresses and partitions" directory path for a superstep
-   *
-   * @param attempt application attempt number
-   * @param superstep superstep to use
-   * @return directory path based on the a superstep
-   */
-  public final String getAddressesAndPartitionsPath(long attempt,
-      long superstep) {
-    return applicationAttemptsPath + "/" + attempt +
-        SUPERSTEP_DIR + "/" + superstep + ADDRESSES_AND_PARTITIONS_DIR;
-  }
-
-  /**
    * Generate the "partition exchange" directory path for a superstep
    *
    * @param attempt application attempt number
@@ -567,11 +544,6 @@ public abstract class BspService<I extends WritableComparable,
     return workerHealthRegistrationChanged;
   }
 
-  public final BspEvent getAddressesAndPartitionsReadyChangedEvent() {
-    return addressesAndPartitionsReadyChanged;
-  }
-
-
   public final BspEvent getApplicationAttemptChangedEvent() {
     return applicationAttemptChanged;
   }
@@ -895,14 +867,6 @@ public abstract class BspService<I extends WritableComparable,
       }
       inputSplitsWorkerDoneEvent.signal();
       eventProcessed = true;
-    } else if (event.getPath().contains(ADDRESSES_AND_PARTITIONS_DIR) &&
-        event.getType() == EventType.NodeCreated) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("process: partitionAssignmentsReadyChanged " +
-            "(partitions are assigned)");
-      }
-      addressesAndPartitionsReadyChanged.signal();
-      eventProcessed = true;
     } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) &&
         event.getType() == EventType.NodeCreated) {
       if (LOG.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
index c3c16eb..5249829 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/CentralizedServiceWorker.java
@@ -20,6 +20,7 @@ package org.apache.giraph.bsp;
 
 import org.apache.giraph.comm.ServerData;
 import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
 import org.apache.giraph.graph.FinishedSuperstepStats;
 import org.apache.giraph.graph.GlobalStats;
 import org.apache.giraph.graph.GraphTaskManager;
@@ -245,4 +246,12 @@ public interface CentralizedServiceWorker<I extends WritableComparable,
    * @return Input splits handler
    */
   WorkerInputSplitsHandler getInputSplitsHandler();
+
+  /**
+   * Received addresses and partitions assignments from master.
+   *
+   * @param addressesAndPartitions Addresses and partitions assignment
+   */
+  void addressesAndPartitionsReceived(
+      AddressesAndPartitionsWritable addressesAndPartitions);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/comm/requests/AddressesAndPartitionsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/AddressesAndPartitionsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AddressesAndPartitionsRequest.java
new file mode 100644
index 0000000..803cb7a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/AddressesAndPartitionsRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.graph.AddressesAndPartitionsWritable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Request for master sending addresses and partitions to workers
+ */
+public class AddressesAndPartitionsRequest extends WritableRequest
+    implements WorkerRequest {
+  /** Addresses and partitions assignments */
+  private AddressesAndPartitionsWritable addressesAndPartitions;
+
+  /** Constructor for reflection */
+  public AddressesAndPartitionsRequest() {
+  }
+
+  /**
+   * Constructor
+   *
+   * @param addressesAndPartitions Addresses and partitions
+   */
+  public AddressesAndPartitionsRequest(
+      AddressesAndPartitionsWritable addressesAndPartitions) {
+    this.addressesAndPartitions = addressesAndPartitions;
+  }
+
+  @Override
+  public void doRequest(ServerData serverData) {
+    serverData.getServiceWorker().addressesAndPartitionsReceived(
+        addressesAndPartitions);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.ADDRESSES_AND_PARTITIONS_REQUEST;
+  }
+
+  @Override
+  void writeRequest(DataOutput output) throws IOException {
+    addressesAndPartitions.write(output);
+  }
+
+  @Override
+  void readFieldsRequest(DataInput input) throws IOException {
+    addressesAndPartitions =
+        new AddressesAndPartitionsWritable();
+    addressesAndPartitions.readFields(input);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/comm/requests/PartitionStatsRequest.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/PartitionStatsRequest.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/PartitionStatsRequest.java
new file mode 100644
index 0000000..d3165d3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/PartitionStatsRequest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.comm.requests;
+
+import org.apache.giraph.master.MasterGlobalCommHandler;
+import org.apache.giraph.partition.PartitionStats;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Request for sending partition stats from workers to master
+ */
+public class PartitionStatsRequest extends WritableRequest
+    implements MasterRequest {
+  /** Partition stats */
+  private List<PartitionStats> partitionStats;
+
+  /**
+   * Constructor
+   *
+   * @param partitionStats Partition stats to send
+   */
+  public PartitionStatsRequest(Collection<PartitionStats> partitionStats) {
+    this.partitionStats = new ArrayList<>(partitionStats);
+  }
+
+  /** Constructor for reflection */
+  public PartitionStatsRequest() {
+  }
+
+  @Override
+  public void doRequest(MasterGlobalCommHandler commHandler) {
+    commHandler.receivedPartitionStats(partitionStats);
+  }
+
+  @Override
+  public RequestType getType() {
+    return RequestType.PARTITION_STATS_REQUEST;
+  }
+
+  @Override
+  void writeRequest(DataOutput output) throws IOException {
+    output.writeInt(partitionStats.size());
+    for (PartitionStats partitionStat : partitionStats) {
+      partitionStat.write(output);
+    }
+  }
+
+  @Override
+  void readFieldsRequest(DataInput input) throws IOException {
+    int size = input.readInt();
+    partitionStats = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      PartitionStats newPartitionStats = new PartitionStats();
+      newPartitionStats.readFields(input);
+      partitionStats.add(newPartitionStats);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
index 627c2af..ce1cd7c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/requests/RequestType.java
@@ -66,7 +66,11 @@ else[HADOOP_NON_SECURE]*/
   /** Send request with granted input split from master to workers */
   REPLY_WITH_INPUT_SPLIT_REQUEST(ReplyWithInputSplitRequest.class),
   /** Send request to resume sending messages (used in flow-control) */
-  SEND_RESUME_REQUEST(SendResumeRequest.class);
+  SEND_RESUME_REQUEST(SendResumeRequest.class),
+  /** Send addresses and partitions assignments from master to workers */
+  ADDRESSES_AND_PARTITIONS_REQUEST(AddressesAndPartitionsRequest.class),
+  /** Send partition stats from worker to master */
+  PARTITION_STATS_REQUEST(PartitionStatsRequest.class);
 
   /** Class of request which this type corresponds to */
   private final Class<? extends WritableRequest> requestClass;

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
index 1139610..352e3af 100644
--- a/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/AddressesAndPartitionsWritable.java
@@ -20,6 +20,8 @@ package org.apache.giraph.graph;
 
 import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.master.MasterInfo;
+import org.apache.giraph.utils.ReflectionUtils;
+import org.apache.giraph.utils.WritableUtils;
 import org.apache.giraph.worker.WorkerInfo;
 import org.apache.hadoop.io.Writable;
 
@@ -44,8 +46,6 @@ public class AddressesAndPartitionsWritable implements Writable {
   private List<WorkerInfo> workerInfos;
   /** Collection of partitions */
   private Collection<PartitionOwner> partitionOwners;
-  /** Partition owner class, used to deserialize object */
-  private Class<? extends PartitionOwner> partitionOwnerClass;
 
   /**
    * Constructor when we want to serialize object
@@ -62,14 +62,8 @@ public class AddressesAndPartitionsWritable implements Writable {
     this.partitionOwners = partitionOwners;
   }
 
-  /**
-   * Constructor when we want to deserialize object
-   *
-   * @param partitionOwnerClass Partition owner class
-   */
-  public AddressesAndPartitionsWritable(
-      Class<? extends PartitionOwner> partitionOwnerClass) {
-    this.partitionOwnerClass = partitionOwnerClass;
+  /** Constructor for reflection */
+  public AddressesAndPartitionsWritable() {
   }
 
   /**
@@ -126,6 +120,10 @@ public class AddressesAndPartitionsWritable implements Writable {
     }
 
     output.writeInt(partitionOwners.size());
+    if (partitionOwners.size() > 0) {
+      WritableUtils.writeClass(
+          partitionOwners.iterator().next().getClass(), output);
+    }
     for (PartitionOwner partitionOwner : partitionOwners) {
       partitionOwner.writeWithWorkerIds(output);
     }
@@ -153,21 +151,16 @@ public class AddressesAndPartitionsWritable implements Writable {
     }
 
     int partitionOwnersSize = input.readInt();
+    Class<PartitionOwner> partitionOwnerClass = null;
+    if (partitionOwnersSize > 0) {
+      partitionOwnerClass = WritableUtils.readClass(input);
+    }
     partitionOwners = Lists.newArrayListWithCapacity(partitionOwnersSize);
     for (int i = 0; i < partitionOwnersSize; i++) {
-      try {
-        PartitionOwner partitionOwner = partitionOwnerClass.newInstance();
-        partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
-        partitionOwners.add(partitionOwner);
-      } catch (InstantiationException e) {
-        throw new IllegalStateException("readFields: " +
-            "InstantiationException on partition owner class " +
-            partitionOwnerClass, e);
-      } catch (IllegalAccessException e) {
-        throw new IllegalStateException("readFields: " +
-            "IllegalAccessException on partition owner class " +
-            partitionOwnerClass, e);
-      }
+      PartitionOwner partitionOwner =
+          ReflectionUtils.newInstance(partitionOwnerClass);
+      partitionOwner.readFieldsWithWorkerIds(input, workerInfoMap);
+      partitionOwners.add(partitionOwner);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 605e818..00da53c 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -50,6 +50,7 @@ import org.apache.giraph.comm.MasterClient;
 import org.apache.giraph.comm.MasterServer;
 import org.apache.giraph.comm.netty.NettyMasterClient;
 import org.apache.giraph.comm.netty.NettyMasterServer;
+import org.apache.giraph.comm.requests.AddressesAndPartitionsRequest;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -888,8 +889,6 @@ public class BspServiceMaster<I extends WritableComparable,
   private GlobalStats aggregateWorkerStats(long superstep) {
     ImmutableClassesGiraphConfiguration conf = getConfiguration();
 
-    Class<? extends PartitionStats> partitionStatsClass =
-        masterGraphPartitioner.createPartitionStats().getClass();
     GlobalStats globalStats = new GlobalStats();
     // Get the stats from the all the worker selected nodes
     String workerFinishedPath =
@@ -909,7 +908,6 @@ public class BspServiceMaster<I extends WritableComparable,
 
     AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
 
-    allPartitionStatsList.clear();
     for (String finishedPath : workerFinishedPathList) {
       String hostnamePartitionId = FilenameUtils.getName(finishedPath);
       JSONObject workerFinishedInfoObj = null;
@@ -918,16 +916,6 @@ public class BspServiceMaster<I extends WritableComparable,
             getZkExt().getData(finishedPath, false, null);
         workerFinishedInfoObj = new JSONObject(new String(zkData,
             Charset.defaultCharset()));
-        List<PartitionStats> statsList =
-            WritableUtils.readListFieldsFromByteArray(
-                Base64.decode(workerFinishedInfoObj.getString(
-                    JSONOBJ_PARTITION_STATS_KEY)),
-                    partitionStatsClass,
-                    conf);
-        for (PartitionStats partitionStats : statsList) {
-          globalStats.addPartitionStats(partitionStats);
-          allPartitionStatsList.add(partitionStats);
-        }
         globalStats.addMessageCount(
             workerFinishedInfoObj.getLong(
                 JSONOBJ_NUM_MESSAGES_KEY));
@@ -969,6 +957,14 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
+    allPartitionStatsList.clear();
+    Iterable<PartitionStats> statsList = globalCommHandler.getAllPartitionStats(
+        workerFinishedPathList.size(), getContext());
+    for (PartitionStats partitionStats : statsList) {
+      globalStats.addPartitionStats(partitionStats);
+      allPartitionStatsList.add(partitionStats);
+    }
+
     if (conf.metricsEnabled()) {
       if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(conf)) {
         aggregatedMetrics.print(superstep, System.err);
@@ -1141,18 +1137,16 @@ public class BspServiceMaster<I extends WritableComparable,
       }
     }
 
-    // Workers are waiting for these assignments
     AddressesAndPartitionsWritable addressesAndPartitions =
         new AddressesAndPartitionsWritable(masterInfo, chosenWorkerInfoList,
             partitionOwners);
-    String addressesAndPartitionsPath =
-        getAddressesAndPartitionsPath(getApplicationAttempt(),
-            getSuperstep());
-    WritableUtils.writeToZnode(
-        getZkExt(),
-        addressesAndPartitionsPath,
-        -1,
-        addressesAndPartitions);
+    // Send assignments to every worker
+    // TODO for very large number of partitions we might want to split this
+    // across multiple requests
+    for (WorkerInfo workerInfo : chosenWorkerInfoList) {
+      masterClient.sendWritableRequest(workerInfo.getTaskId(),
+          new AddressesAndPartitionsRequest(addressesAndPartitions));
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
index 717a24d..33427ad 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommHandler.java
@@ -19,8 +19,15 @@
 package org.apache.giraph.master;
 
 import org.apache.giraph.master.input.MasterInputSplitsHandler;
+import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.reducers.ReduceOperation;
+import org.apache.giraph.utils.BlockingElementsSet;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.collect.Iterables;
+
+import java.util.List;
 
 /**
  * Handler for all master communications
@@ -30,6 +37,9 @@ public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
   private final MasterAggregatorHandler aggregatorHandler;
   /** Input splits handler*/
   private final MasterInputSplitsHandler inputSplitsHandler;
+  /** Partition stats received from workers */
+  private final BlockingElementsSet<List<PartitionStats>> partitionStats =
+      new BlockingElementsSet<>();
 
   /**
    * Constructor
@@ -73,4 +83,26 @@ public class MasterGlobalCommHandler implements MasterGlobalCommUsage {
   public void broadcast(String name, Writable value) {
     aggregatorHandler.broadcast(name, value);
   }
+
+  /**
+   * Received partition stats from a worker
+   *
+   * @param partitionStats Partition stats
+   */
+  public void receivedPartitionStats(List<PartitionStats> partitionStats) {
+    this.partitionStats.offer(partitionStats);
+  }
+
+  /**
+   * Get all partition stats. Blocks until all workers have sent their stats
+   *
+   * @param numWorkers Number of workers to wait for
+   * @param progressable Progressable to report progress to
+   * @return All partition stats
+   */
+  public Iterable<PartitionStats> getAllPartitionStats(int numWorkers,
+      Progressable progressable) {
+    return Iterables.concat(
+        partitionStats.getElements(numWorkers, progressable));
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
index b607ed2..33fe114 100644
--- a/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/partition/PartitionUtils.java
@@ -209,49 +209,6 @@ public class PartitionUtils {
       LOG.info("computePartitionCount: Creating " +
           partitionCount + " partitions.");
     }
-    int maxPartitions = getMaxPartitions(conf);
-    if (partitionCount > maxPartitions) {
-      // try to keep partitionCount divisible by number of workers
-      // in order to keep the balance
-      int reducedPartitions = (maxPartitions / availableWorkerCount) *
-          availableWorkerCount;
-      if (reducedPartitions == 0) {
-        reducedPartitions = maxPartitions;
-      }
-      if (LOG.isInfoEnabled()) {
-        LOG.info("computePartitionCount: " +
-            "Reducing the partitionCount to " + reducedPartitions +
-            " from " + partitionCount + " because of " + maxPartitions +
-            " limit");
-      }
-      partitionCount = reducedPartitions;
-    }
-
     return partitionCount;
   }
-
-  /**
-   * Get the maximum number of partitions supported by Giraph.
-   *
-   * ZooKeeper has a limit of the data in a single znode of 1 MB,
-   * and we write all partition descriptions to the same znode.
-   *
-   * If we are not using checkpointing, each partition owner is serialized
-   * as 4 ints (16B), and we need some space to write the list of workers
-   * there. 50k partitions is conservative enough.
-   *
-   * When checkpointing is used, we need enough space to write all the
-   * checkpoint file paths.
-   *
-   * @param conf Configuration.
-   * @return Maximum number of partitions allowed
-   */
-  private static int getMaxPartitions(
-      ImmutableClassesGiraphConfiguration conf) {
-    if (conf.useCheckpointing()) {
-      return 5000;
-    } else {
-      return 50000;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/utils/BlockingElementsSet.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/BlockingElementsSet.java b/giraph-core/src/main/java/org/apache/giraph/utils/BlockingElementsSet.java
new file mode 100644
index 0000000..5ed1c79
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/BlockingElementsSet.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.utils;
+
+import org.apache.hadoop.util.Progressable;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+/**
+ * Keeps a set of elements, and allows for waiting on certain number of
+ * elements to become available. Assumes that at any point no more elements
+ * than we'll be asking for will be added to the set. Reusable.
+ *
+ * @param <T> Element type
+ */
+public class BlockingElementsSet<T> {
+  /** Semaphore to keep track of element count */
+  private final Semaphore semaphore = new Semaphore(0);
+  /** Elements */
+  private final List<T> elements =
+      Collections.synchronizedList(new ArrayList<T>());
+
+  /**
+   * Put an element in the set
+   *
+   * @param element Element to put
+   */
+  public void offer(T element) {
+    elements.add(element);
+    semaphore.release();
+  }
+
+  /**
+   * Get one element when it becomes available,
+   * reporting progress while waiting
+   *
+   * @param progressable Progressable to report progress
+   * @return Element acquired
+   */
+  public T getElement(Progressable progressable) {
+    return getElements(1, progressable).get(0);
+  }
+
+  /**
+   * Get desired number of elements when they become available,
+   * reporting progress while waiting
+   *
+   * @param elementCount How many elements to wait for
+   * @param progressable Progressable to report progress
+   * @return List of elements acquired
+   */
+  public List<T> getElements(int elementCount, Progressable progressable) {
+    ProgressableUtils.awaitSemaphorePermits(
+        semaphore, elementCount, progressable);
+    Preconditions.checkState(elements.size() == elementCount);
+    List<T> ret = new ArrayList<>(elements);
+    elements.clear();
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
index 3008248..88bb944 100644
--- a/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
+++ b/giraph-core/src/main/java/org/apache/giraph/utils/ProgressableUtils.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -132,6 +133,26 @@ public class ProgressableUtils {
   }
 
   /**
+   * Wait to acquire enough permits from {@link Semaphore}, while periodically
+   * reporting progress.
+   *
+   * @param semaphore    Semaphore
+   * @param permits      How many permits to acquire
+   * @param progressable Progressable for reporting progress (Job context)
+   */
+  public static void awaitSemaphorePermits(final Semaphore semaphore,
+      int permits, Progressable progressable) {
+    while (true) {
+      waitForever(new SemaphoreWaitable(semaphore, permits), progressable);
+      // Verify permits were not taken by another thread,
+      // if they were keep looping
+      if (semaphore.tryAcquire(permits)) {
+        return;
+      }
+    }
+  }
+
+  /**
    * Wait forever for waitable to finish. Periodically reports progress.
    *
    * @param waitable Waitable which we wait for
@@ -445,4 +466,41 @@ public class ProgressableUtils {
       return future.isDone();
     }
   }
+
+  /**
+   * {@link Waitable} for waiting on required number of permits in a
+   * {@link Semaphore} to become available.
+   */
+  private static class SemaphoreWaitable extends WaitableWithoutResult {
+    /** Semaphore to wait on */
+    private final Semaphore semaphore;
+    /** How many permits to wait on */
+    private final int permits;
+
+    /**
+     * Constructor
+     *
+     * @param semaphore Semaphore to wait on
+     * @param permits How many permits to wait on
+     */
+    public SemaphoreWaitable(Semaphore semaphore, int permits) {
+      this.semaphore = semaphore;
+      this.permits = permits;
+    }
+
+    @Override
+    public void waitFor(int msecs) throws InterruptedException {
+      boolean acquired =
+          semaphore.tryAcquire(permits, msecs, TimeUnit.MILLISECONDS);
+      // Return permits if we managed to acquire them
+      if (acquired) {
+        semaphore.release(permits);
+      }
+    }
+
+    @Override
+    public boolean isFinished() {
+      return semaphore.availablePermits() >= permits;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/28cbe037/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index bf48ea8..b008f28 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -52,6 +52,7 @@ import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerClient;
 import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
 import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.comm.requests.PartitionStatsRequest;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.Edge;
@@ -82,6 +83,7 @@ import org.apache.giraph.partition.PartitionOwner;
 import org.apache.giraph.partition.PartitionStats;
 import org.apache.giraph.partition.PartitionStore;
 import org.apache.giraph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.utils.BlockingElementsSet;
 import org.apache.giraph.utils.CallableFactory;
 import org.apache.giraph.utils.CheckpointingUtils;
 import org.apache.giraph.utils.JMapHistoDumper;
@@ -156,6 +158,10 @@ public class BspServiceWorker<I extends WritableComparable,
   /** Have the partition exchange children (workers) changed? */
   private final BspEvent partitionExchangeChildrenChanged;
 
+  /** Addresses and partitions transfer */
+  private BlockingElementsSet<AddressesAndPartitionsWritable>
+      addressesAndPartitionsHolder = new BlockingElementsSet<>();
+
   /** Worker Context */
   private final WorkerContext workerContext;
 
@@ -702,31 +708,8 @@ else[HADOOP_NON_SECURE]*/
 
     registerHealth(getSuperstep());
 
-    String addressesAndPartitionsPath =
-        getAddressesAndPartitionsPath(getApplicationAttempt(),
-            getSuperstep());
     AddressesAndPartitionsWritable addressesAndPartitions =
-        new AddressesAndPartitionsWritable(
-            workerGraphPartitioner.createPartitionOwner().getClass());
-    try {
-      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
-          null) {
-        getAddressesAndPartitionsReadyChangedEvent().waitForever();
-        getAddressesAndPartitionsReadyChangedEvent().reset();
-      }
-      WritableUtils.readFieldsFromZnode(
-          getZkExt(),
-          addressesAndPartitionsPath,
-          false,
-          null,
-          addressesAndPartitions);
-    } catch (KeeperException e) {
-      throw new IllegalStateException(
-          "startSuperstep: KeeperException getting assignments", e);
-    } catch (InterruptedException e) {
-      throw new IllegalStateException(
-          "startSuperstep: InterruptedException getting assignments", e);
-    }
+        addressesAndPartitionsHolder.getElement(getContext());
 
     workerInfoList.clear();
     workerInfoList = addressesAndPartitions.getWorkerInfos();
@@ -734,10 +717,6 @@ else[HADOOP_NON_SECURE]*/
 
     if (LOG.isInfoEnabled()) {
       LOG.info("startSuperstep: " + masterInfo);
-      LOG.info("startSuperstep: Ready for computation on superstep " +
-          getSuperstep() + " since worker " +
-          "selection and vertex range assignments are done in " +
-          addressesAndPartitionsPath);
     }
 
     getContext().setStatus("startSuperstep: " +
@@ -916,18 +895,14 @@ else[HADOOP_NON_SECURE]*/
     Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, getPartitionStore());
-    List<PartitionStats> finalizedPartitionStatsList =
-        new ArrayList<PartitionStats>(finalizedPartitionStats);
-    byte[] partitionStatsBytes =
-        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+    workerClient.sendWritableRequest(masterInfo.getTaskId(),
+        new PartitionStatsRequest(finalizedPartitionStats));
     WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
     metrics.readFromRegistry();
     byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
 
     JSONObject workerFinishedInfoObj = new JSONObject();
     try {
-      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
-          Base64.encodeBytes(partitionStatsBytes));
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
       workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGE_BYTES_KEY,
         workerSentMessageBytes);
@@ -1849,4 +1824,10 @@ else[HADOOP_NON_SECURE]*/
   public WorkerInputSplitsHandler getInputSplitsHandler() {
     return inputSplitsHandler;
   }
+
+  @Override
+  public void addressesAndPartitionsReceived(
+      AddressesAndPartitionsWritable addressesAndPartitions) {
+    addressesAndPartitionsHolder.offer(addressesAndPartitions);
+  }
 }